You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/14 11:55:47 UTC

[1/3] flink git commit: [FLINK-4720] Implement archived ExecutionGraph

Repository: flink
Updated Branches:
  refs/heads/master f6d866817 -> 21e8e2dcf


http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
new file mode 100644
index 0000000..d0566d9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ArchivedExecutionGraphTest {
+	private static JobVertexID v1ID = new JobVertexID();
+	private static JobVertexID v2ID = new JobVertexID();
+
+	private static ExecutionAttemptID executionWithAccumulatorsID;
+
+	private static ExecutionGraph runtimeGraph;
+
+	@BeforeClass
+	public static void setupExecutionGraph() throws Exception {
+		// -------------------------------------------------------------------------------------------------------------
+		// Setup
+		// -------------------------------------------------------------------------------------------------------------
+
+		v1ID = new JobVertexID();
+		v2ID = new JobVertexID();
+
+		JobVertex v1 = new JobVertex("v1", v1ID);
+		JobVertex v2 = new JobVertex("v2", v2ID);
+
+		v1.setParallelism(1);
+		v2.setParallelism(2);
+
+		List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
+
+		ExecutionConfig config = new ExecutionConfig();
+
+		config.setExecutionMode(ExecutionMode.BATCH_FORCED);
+		config.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
+		config.setParallelism(4);
+		config.enableObjectReuse();
+		config.setGlobalJobParameters(new TestJobParameters());
+
+		runtimeGraph = new ExecutionGraph(
+			TestingUtils.defaultExecutionContext(),
+			new JobID(),
+			"test job",
+			new Configuration(),
+			new SerializedValue<>(config),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy());
+		runtimeGraph.attachJobGraph(vertices);
+
+		runtimeGraph.enableSnapshotCheckpointing(
+			100,
+			100,
+			100,
+			1,
+			Collections.<ExecutionJobVertex>emptyList(),
+			Collections.<ExecutionJobVertex>emptyList(),
+			Collections.<ExecutionJobVertex>emptyList(),
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1, null),
+			new HeapSavepointStore(),
+			new TestCheckpointStatsTracker());
+
+		Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = new HashMap<>();
+		flinkAccumulators.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, new LongCounter(32));
+
+		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
+		userAccumulators.put("userAcc", new LongCounter(64));
+
+		Execution executionWithAccumulators = runtimeGraph.getJobVertex(v1ID).getTaskVertices()[0].getCurrentExecutionAttempt();
+		executionWithAccumulators.setAccumulators(flinkAccumulators, userAccumulators);
+		executionWithAccumulatorsID = executionWithAccumulators.getAttemptId();
+
+		runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new RuntimeException("This exception was thrown on purpose."));
+	}
+
+	@Test
+	public void testArchive() throws IOException, ClassNotFoundException {
+		ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+
+		compareExecutionGraph(runtimeGraph, archivedGraph);
+	}
+
+	@Test
+	public void testSerialization() throws IOException, ClassNotFoundException {
+		ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+
+		verifySerializability(archivedGraph);
+	}
+
+	private static void compareExecutionGraph(AccessExecutionGraph runtimeGraph, AccessExecutionGraph archivedGraph) throws IOException, ClassNotFoundException {
+		assertTrue(archivedGraph.isArchived());
+		// -------------------------------------------------------------------------------------------------------------
+		// ExecutionGraph
+		// -------------------------------------------------------------------------------------------------------------
+		assertEquals(runtimeGraph.getJsonPlan(), archivedGraph.getJsonPlan());
+		assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
+		assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
+		assertEquals(runtimeGraph.getState(), archivedGraph.getState());
+		assertEquals(runtimeGraph.getFailureCauseAsString(), archivedGraph.getFailureCauseAsString());
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), archivedGraph.getStatusTimestamp(JobStatus.CREATED));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), archivedGraph.getStatusTimestamp(JobStatus.FAILING));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILED), archivedGraph.getStatusTimestamp(JobStatus.FAILED));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELLING), archivedGraph.getStatusTimestamp(JobStatus.CANCELLING));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
+		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
+		assertEquals(runtimeGraph.isStoppable(), archivedGraph.isStoppable());
+
+		// -------------------------------------------------------------------------------------------------------------
+		// JobCheckpointStats
+		// -------------------------------------------------------------------------------------------------------------
+		JobCheckpointStats runtimeStats = runtimeGraph.getCheckpointStatsTracker().getJobStats().get();
+		JobCheckpointStats archivedStats = archivedGraph.getCheckpointStatsTracker().getJobStats().get();
+
+		assertEquals(runtimeStats.getAverageDuration(), archivedStats.getAverageDuration());
+		assertEquals(runtimeStats.getMinDuration(), archivedStats.getMinDuration());
+		assertEquals(runtimeStats.getMaxDuration(), archivedStats.getMaxDuration());
+		assertEquals(runtimeStats.getAverageStateSize(), archivedStats.getAverageStateSize());
+		assertEquals(runtimeStats.getMinStateSize(), archivedStats.getMinStateSize());
+		assertEquals(runtimeStats.getMaxStateSize(), archivedStats.getMaxStateSize());
+		assertEquals(runtimeStats.getCount(), archivedStats.getCount());
+		assertEquals(runtimeStats.getRecentHistory(), archivedStats.getRecentHistory());
+
+		// -------------------------------------------------------------------------------------------------------------
+		// ArchivedExecutionConfig
+		// -------------------------------------------------------------------------------------------------------------
+		ArchivedExecutionConfig runtimeConfig = runtimeGraph.getArchivedExecutionConfig();
+		ArchivedExecutionConfig archivedConfig = archivedGraph.getArchivedExecutionConfig();
+
+		assertEquals(runtimeConfig.getExecutionMode(), archivedConfig.getExecutionMode());
+		assertEquals(runtimeConfig.getParallelism(), archivedConfig.getParallelism());
+		assertEquals(runtimeConfig.getObjectReuseEnabled(), archivedConfig.getObjectReuseEnabled());
+		assertEquals(runtimeConfig.getRestartStrategyDescription(), archivedConfig.getRestartStrategyDescription());
+		assertNotNull(archivedConfig.getGlobalJobParameters().get("hello"));
+		assertEquals(runtimeConfig.getGlobalJobParameters().get("hello"), archivedConfig.getGlobalJobParameters().get("hello"));
+
+		// -------------------------------------------------------------------------------------------------------------
+		// StringifiedAccumulators
+		// -------------------------------------------------------------------------------------------------------------
+		compareStringifiedAccumulators(runtimeGraph.getAccumulatorResultsStringified(), archivedGraph.getAccumulatorResultsStringified());
+		compareSerializedAccumulators(runtimeGraph.getAccumulatorsSerialized(), archivedGraph.getAccumulatorsSerialized());
+		compareFlinkAccumulators(runtimeGraph.getFlinkAccumulators().get(executionWithAccumulatorsID), archivedGraph.getFlinkAccumulators().get(executionWithAccumulatorsID));
+
+		// -------------------------------------------------------------------------------------------------------------
+		// JobVertices
+		// -------------------------------------------------------------------------------------------------------------
+		Map<JobVertexID, ? extends AccessExecutionJobVertex> runtimeVertices = runtimeGraph.getAllVertices();
+		Map<JobVertexID, ? extends AccessExecutionJobVertex> archivedVertices = archivedGraph.getAllVertices();
+
+		for (Map.Entry<JobVertexID, ? extends AccessExecutionJobVertex> vertex : runtimeVertices.entrySet()) {
+			compareExecutionJobVertex(vertex.getValue(), archivedVertices.get(vertex.getKey()));
+		}
+
+		Iterator<? extends AccessExecutionJobVertex> runtimeTopologicalVertices = runtimeGraph.getVerticesTopologically().iterator();
+		Iterator<? extends AccessExecutionJobVertex> archiveTopologicaldVertices = archivedGraph.getVerticesTopologically().iterator();
+
+		while (runtimeTopologicalVertices.hasNext()) {
+			assertTrue(archiveTopologicaldVertices.hasNext());
+			compareExecutionJobVertex(runtimeTopologicalVertices.next(), archiveTopologicaldVertices.next());
+		}
+
+		// -------------------------------------------------------------------------------------------------------------
+		// OperatorCheckpointStats
+		// -------------------------------------------------------------------------------------------------------------
+		CheckpointStatsTracker runtimeTracker = runtimeGraph.getCheckpointStatsTracker();
+		CheckpointStatsTracker archivedTracker = archivedGraph.getCheckpointStatsTracker();
+		compareOperatorCheckpointStats(runtimeTracker.getOperatorStats(v1ID).get(), archivedTracker.getOperatorStats(v1ID).get());
+		compareOperatorCheckpointStats(runtimeTracker.getOperatorStats(v2ID).get(), archivedTracker.getOperatorStats(v2ID).get());
+
+		// -------------------------------------------------------------------------------------------------------------
+		// ExecutionVertices
+		// -------------------------------------------------------------------------------------------------------------
+		Iterator<? extends AccessExecutionVertex> runtimeExecutionVertices = runtimeGraph.getAllExecutionVertices().iterator();
+		Iterator<? extends AccessExecutionVertex> archivedExecutionVertices = archivedGraph.getAllExecutionVertices().iterator();
+
+		while (runtimeExecutionVertices.hasNext()) {
+			assertTrue(archivedExecutionVertices.hasNext());
+			compareExecutionVertex(runtimeExecutionVertices.next(), archivedExecutionVertices.next());
+		}
+	}
+
+	private static void compareExecutionJobVertex(AccessExecutionJobVertex runtimeJobVertex, AccessExecutionJobVertex archivedJobVertex) {
+		assertEquals(runtimeJobVertex.getName(), archivedJobVertex.getName());
+		assertEquals(runtimeJobVertex.getParallelism(), archivedJobVertex.getParallelism());
+		assertEquals(runtimeJobVertex.getMaxParallelism(), archivedJobVertex.getMaxParallelism());
+		assertEquals(runtimeJobVertex.getJobVertexId(), archivedJobVertex.getJobVertexId());
+		assertEquals(runtimeJobVertex.getAggregateState(), archivedJobVertex.getAggregateState());
+
+		compareOperatorCheckpointStats(runtimeJobVertex.getCheckpointStats().get(), archivedJobVertex.getCheckpointStats().get());
+
+		compareStringifiedAccumulators(runtimeJobVertex.getAggregatedUserAccumulatorsStringified(), archivedJobVertex.getAggregatedUserAccumulatorsStringified());
+		compareFlinkAccumulators(runtimeJobVertex.getAggregatedMetricAccumulators(), archivedJobVertex.getAggregatedMetricAccumulators());
+
+		AccessExecutionVertex[] runtimeExecutionVertices = runtimeJobVertex.getTaskVertices();
+		AccessExecutionVertex[] archivedExecutionVertices = archivedJobVertex.getTaskVertices();
+		assertEquals(runtimeExecutionVertices.length, archivedExecutionVertices.length);
+		for (int x = 0; x < runtimeExecutionVertices.length; x++) {
+			compareExecutionVertex(runtimeExecutionVertices[x], archivedExecutionVertices[x]);
+		}
+	}
+
+	private static void compareExecutionVertex(AccessExecutionVertex runtimeVertex, AccessExecutionVertex archivedVertex) {
+		assertEquals(runtimeVertex.getTaskNameWithSubtaskIndex(), archivedVertex.getTaskNameWithSubtaskIndex());
+		assertEquals(runtimeVertex.getParallelSubtaskIndex(), archivedVertex.getParallelSubtaskIndex());
+		assertEquals(runtimeVertex.getExecutionState(), archivedVertex.getExecutionState());
+		assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CREATED), archivedVertex.getStateTimestamp(ExecutionState.CREATED));
+		assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.SCHEDULED), archivedVertex.getStateTimestamp(ExecutionState.SCHEDULED));
+		assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.DEPLOYING), archivedVertex.getStateTimestamp(ExecutionState.DEPLOYING));
+		assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.RUNNING), archivedVertex.getStateTimestamp(ExecutionState.RUNNING));
+		assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.FINISHED), archivedVertex.getStateTimestamp(ExecutionState.FINISHED));
+		assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CANCELING), archivedVertex.getStateTimestamp(ExecutionState.CANCELING));
+		assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CANCELED), archivedVertex.getStateTimestamp(ExecutionState.CANCELED));
+		assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.FAILED), archivedVertex.getStateTimestamp(ExecutionState.FAILED));
+		assertEquals(runtimeVertex.getFailureCauseAsString(), archivedVertex.getFailureCauseAsString());
+		assertEquals(runtimeVertex.getCurrentAssignedResourceLocation(), archivedVertex.getCurrentAssignedResourceLocation());
+
+		compareExecution(runtimeVertex.getCurrentExecutionAttempt(), archivedVertex.getCurrentExecutionAttempt());
+	}
+
+	private static void compareExecution(AccessExecution runtimeExecution, AccessExecution archivedExecution) {
+		assertEquals(runtimeExecution.getAttemptId(), archivedExecution.getAttemptId());
+		assertEquals(runtimeExecution.getAttemptNumber(), archivedExecution.getAttemptNumber());
+		assertArrayEquals(runtimeExecution.getStateTimestamps(), archivedExecution.getStateTimestamps());
+		assertEquals(runtimeExecution.getState(), archivedExecution.getState());
+		assertEquals(runtimeExecution.getAssignedResourceLocation(), archivedExecution.getAssignedResourceLocation());
+		assertEquals(runtimeExecution.getFailureCauseAsString(), archivedExecution.getFailureCauseAsString());
+		assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CREATED), archivedExecution.getStateTimestamp(ExecutionState.CREATED));
+		assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.SCHEDULED), archivedExecution.getStateTimestamp(ExecutionState.SCHEDULED));
+		assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.DEPLOYING), archivedExecution.getStateTimestamp(ExecutionState.DEPLOYING));
+		assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.RUNNING), archivedExecution.getStateTimestamp(ExecutionState.RUNNING));
+		assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.FINISHED), archivedExecution.getStateTimestamp(ExecutionState.FINISHED));
+		assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CANCELING), archivedExecution.getStateTimestamp(ExecutionState.CANCELING));
+		assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CANCELED), archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
+		assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.FAILED), archivedExecution.getStateTimestamp(ExecutionState.FAILED));
+		compareStringifiedAccumulators(runtimeExecution.getUserAccumulatorsStringified(), archivedExecution.getUserAccumulatorsStringified());
+		compareFlinkAccumulators(runtimeExecution.getFlinkAccumulators(), archivedExecution.getFlinkAccumulators());
+		assertEquals(runtimeExecution.getParallelSubtaskIndex(), archivedExecution.getParallelSubtaskIndex());
+	}
+
+	private static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] runtimeAccs, StringifiedAccumulatorResult[] archivedAccs) {
+		assertEquals(runtimeAccs.length, archivedAccs.length);
+
+		for (int x = 0; x < runtimeAccs.length; x++) {
+			StringifiedAccumulatorResult runtimeResult = runtimeAccs[x];
+			StringifiedAccumulatorResult archivedResult = archivedAccs[x];
+
+			assertEquals(runtimeResult.getName(), archivedResult.getName());
+			assertEquals(runtimeResult.getType(), archivedResult.getType());
+			assertEquals(runtimeResult.getValue(), archivedResult.getValue());
+		}
+	}
+
+	private static void compareSerializedAccumulators(Map<String, SerializedValue<Object>> runtimeAccs, Map<String, SerializedValue<Object>> archivedAccs) throws IOException, ClassNotFoundException {
+		assertEquals(runtimeAccs.size(), archivedAccs.size());
+		for (Map.Entry<String, SerializedValue<Object>> runtimeAcc : runtimeAccs.entrySet()) {
+			long runtimeUserAcc = (long) runtimeAcc.getValue().deserializeValue(ClassLoader.getSystemClassLoader());
+			long archivedUserAcc = (long) archivedAccs.get(runtimeAcc.getKey()).deserializeValue(ClassLoader.getSystemClassLoader());
+
+			assertEquals(runtimeUserAcc, archivedUserAcc);
+		}
+	}
+
+	private static void compareFlinkAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> runtimeAccs, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> archivedAccs) {
+		assertEquals(runtimeAccs == null, archivedAccs == null);
+		if (runtimeAccs != null && archivedAccs != null) {
+			assertEquals(runtimeAccs.size(), archivedAccs.size());
+			for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> runtimeAcc : runtimeAccs.entrySet()) {
+				Accumulator<?, ?> archivedAcc = archivedAccs.get(runtimeAcc.getKey());
+
+				assertEquals(runtimeAcc.getValue().getLocalValue(), archivedAcc.getLocalValue());
+			}
+		}
+	}
+
+	private static void compareOperatorCheckpointStats(OperatorCheckpointStats runtimeStats, OperatorCheckpointStats archivedStats) {
+		assertEquals(runtimeStats.getNumberOfSubTasks(), archivedStats.getNumberOfSubTasks());
+		assertEquals(runtimeStats.getCheckpointId(), archivedStats.getCheckpointId());
+		assertEquals(runtimeStats.getDuration(), archivedStats.getDuration());
+		assertEquals(runtimeStats.getStateSize(), archivedStats.getStateSize());
+		assertEquals(runtimeStats.getTriggerTimestamp(), archivedStats.getTriggerTimestamp());
+		assertEquals(runtimeStats.getSubTaskDuration(0), archivedStats.getSubTaskDuration(0));
+		assertEquals(runtimeStats.getSubTaskStateSize(0), archivedStats.getSubTaskStateSize(0));
+	}
+
+	private static void verifySerializability(ArchivedExecutionGraph graph) throws IOException, ClassNotFoundException {
+		ArchivedExecutionGraph copy = CommonTestUtils.createCopySerializable(graph);
+		compareExecutionGraph(graph, copy);
+	}
+
+
+	private static class TestCheckpointStatsTracker implements CheckpointStatsTracker {
+
+		@Override
+		public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+		}
+
+		@Override
+		public Option<JobCheckpointStats> getJobStats() {
+			return Option.<JobCheckpointStats>apply(new TestJobCheckpointStats());
+		}
+
+		@Override
+		public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId) {
+			return Option.<OperatorCheckpointStats>apply(new TestOperatorCheckpointStats(operatorId.getUpperPart()));
+		}
+	}
+
+	private static class TestJobCheckpointStats implements JobCheckpointStats {
+		private static final long serialVersionUID = -2630234917947292836L;
+
+		@Override
+		public List<CheckpointStats> getRecentHistory() {
+			return Collections.emptyList();
+		}
+
+		@Override
+		public long getCount() {
+			return 1;
+		}
+
+		@Override
+		public long getMinDuration() {
+			return 2;
+		}
+
+		@Override
+		public long getMaxDuration() {
+			return 4;
+		}
+
+		@Override
+		public long getAverageDuration() {
+			return 3;
+		}
+
+		@Override
+		public long getMinStateSize() {
+			return 5;
+		}
+
+		@Override
+		public long getMaxStateSize() {
+			return 7;
+		}
+
+		@Override
+		public long getAverageStateSize() {
+			return 6;
+		}
+	}
+
+	private static class TestOperatorCheckpointStats extends OperatorCheckpointStats {
+		private static final long serialVersionUID = -2798640928349528644L;
+
+		public TestOperatorCheckpointStats(long offset) {
+			super(1 + offset, 2 + offset, 3 + offset, 4 + offset, new long[][]{new long[]{5 + offset, 6 + offset}});
+		}
+	}
+
+	private static class TestJobParameters extends ExecutionConfig.GlobalJobParameters {
+		private static final long serialVersionUID = -8118611781035212808L;
+		private Map<String, String> parameters;
+
+		private TestJobParameters() {
+			this.parameters = new HashMap<>();
+			this.parameters.put("hello", "world");
+		}
+
+		@Override
+		public Map<String, String> toMap() {
+			return parameters;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 183477a..3c6ae9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -201,7 +201,7 @@ public class JobManagerTest {
 						// Request the execution graph to get the runtime info
 						jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway);
 
-						final ExecutionGraph eg = expectMsgClass(ExecutionGraphFound.class)
+						final ExecutionGraph eg = (ExecutionGraph) expectMsgClass(ExecutionGraphFound.class)
 							.executionGraph();
 
 						final ExecutionVertex vertex = eg.getJobVertex(sender.getID())

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 450f9fb..8a9a4ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -111,7 +111,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		assertTrue(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound);
 
-		ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
+		ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
 
 		TestJobStatusListener testListener = new TestJobStatusListener();
 		executionGraph.registerJobStatusListener(testListener);

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index d07c48f..72cf58b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.accumulators.Accumulator
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.ActorGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
 
@@ -37,7 +37,7 @@ object TestingJobManagerMessages {
     def jobID: JobID
   }
 
-  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
+  case class ExecutionGraphFound(jobID: JobID, executionGraph: AccessExecutionGraph) extends
   ResponseExecutionGraph
 
   case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 1259460..e0e6ecd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -40,7 +40,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -396,7 +395,7 @@ public class QueryableStateITCase extends TestLogger {
 					.map(new Mapper<TestingJobManagerMessages.ExecutionGraphFound, ExecutionGraph>() {
 						@Override
 						public ExecutionGraph apply(ExecutionGraphFound found) {
-							return found.executionGraph();
+							return (ExecutionGraph) found.executionGraph();
 						}
 					}, TEST_ACTOR_SYSTEM.dispatcher());
 			ExecutionGraph eg = Await.result(egFuture, deadline.timeLeft());
@@ -553,12 +552,13 @@ public class QueryableStateITCase extends TestLogger {
 							.mapTo(ClassTag$.MODULE$.<JobFound>apply(JobFound.class)),
 					deadline.timeLeft());
 
-			Throwable failureCause = jobFound.executionGraph().getFailureCause();
+			String failureCause = jobFound.executionGraph().getFailureCauseAsString();
 
-			assertTrue("Not instance of SuppressRestartsException", failureCause instanceof SuppressRestartsException);
-			assertTrue("Not caused by IllegalStateException", failureCause.getCause() instanceof IllegalStateException);
-			Throwable duplicateException = failureCause.getCause();
-			assertTrue("Exception does not contain registration name", duplicateException.getMessage().contains(queryName));
+			assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+			int causedByIndex = failureCause.indexOf("Caused by: ");
+			String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
+			assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
+			assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
 		} finally {
 			// Free cluster resources
 			if (jobId != null) {


[3/3] flink git commit: [FLINK-4720] Implement archived ExecutionGraph

Posted by ch...@apache.org.
[FLINK-4720] Implement archived ExecutionGraph

This closes #2577.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21e8e2dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21e8e2dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21e8e2dc

Branch: refs/heads/master
Commit: 21e8e2dcf77f9d0dc3a74204626b776d87a9cd15
Parents: f6d8668
Author: zentol <ch...@apache.org>
Authored: Thu Sep 22 14:02:22 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 14 13:55:28 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/Archiveable.java    |  24 +
 .../api/common/ArchivedExecutionConfig.java     |  73 ++++
 .../flink/api/common/ExecutionConfig.java       |   7 +-
 .../webmonitor/ExecutionGraphHolder.java        |   9 +-
 .../AbstractExecutionGraphRequestHandler.java   |   6 +-
 .../AbstractJobVertexRequestHandler.java        |  10 +-
 .../AbstractSubtaskAttemptRequestHandler.java   |  12 +-
 .../handlers/AbstractSubtaskRequestHandler.java |  10 +-
 .../handlers/JobAccumulatorsHandler.java        |   4 +-
 .../handlers/JobCheckpointsHandler.java         |   4 +-
 .../webmonitor/handlers/JobConfigHandler.java   |  10 +-
 .../webmonitor/handlers/JobDetailsHandler.java  |  13 +-
 .../handlers/JobExceptionsHandler.java          |  19 +-
 .../webmonitor/handlers/JobPlanHandler.java     |   4 +-
 .../handlers/JobVertexAccumulatorsHandler.java  |   4 +-
 .../handlers/JobVertexBackPressureHandler.java  |   9 +-
 .../handlers/JobVertexCheckpointsHandler.java   |  42 +-
 .../handlers/JobVertexDetailsHandler.java       |  10 +-
 .../handlers/JobVertexTaskManagersHandler.java  |  21 +-
 .../SubtaskCurrentAttemptDetailsHandler.java    |   4 +-
 ...taskExecutionAttemptAccumulatorsHandler.java |   6 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |   6 +-
 .../SubtasksAllAccumulatorsHandler.java         |   8 +-
 .../handlers/SubtasksTimesHandler.java          |  10 +-
 .../BackPressureStatsTrackerITCase.java         |   3 +-
 .../StackTraceSampleCoordinatorITCase.java      |   3 +-
 .../JobVertexCheckpointsHandlerTest.java        |  22 +-
 .../ArchivedCheckpointStatsTracker.java         |  53 +++
 .../checkpoint/stats/CheckpointStats.java       |   4 +-
 .../checkpoint/stats/JobCheckpointStats.java    |   3 +-
 .../stats/OperatorCheckpointStats.java          |   2 +
 .../stats/SimpleCheckpointStatsTracker.java     |   2 +
 .../runtime/executiongraph/AccessExecution.java | 105 +++++
 .../executiongraph/AccessExecutionGraph.java    | 161 +++++++
 .../AccessExecutionJobVertex.java               |  98 +++++
 .../executiongraph/AccessExecutionVertex.java   |  85 ++++
 .../executiongraph/ArchivedExecution.java       | 118 +++++
 .../executiongraph/ArchivedExecutionGraph.java  | 297 +++++++++++++
 .../ArchivedExecutionJobVertex.java             | 136 ++++++
 .../executiongraph/ArchivedExecutionVertex.java |  96 ++++
 .../flink/runtime/executiongraph/Execution.java |  41 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 154 ++++---
 .../executiongraph/ExecutionJobVertex.java      |  61 ++-
 .../runtime/executiongraph/ExecutionVertex.java |  50 +--
 .../archive/ExecutionConfigSummary.java         |  75 ----
 .../runtime/webmonitor/WebMonitorUtils.java     |  14 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   7 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |   4 +-
 .../runtime/messages/ArchiveMessages.scala      |  12 +-
 .../runtime/messages/JobManagerMessages.scala   |   4 +-
 .../checkpoint/CoordinatorShutdownTest.java     |   4 +-
 .../ArchivedExecutionGraphTest.java             | 434 +++++++++++++++++++
 .../runtime/jobmanager/JobManagerTest.java      |   2 +-
 .../LeaderChangeJobRecoveryTest.java            |   2 +-
 .../TestingJobManagerMessages.scala             |   4 +-
 .../flink/test/query/QueryableStateITCase.java  |  14 +-
 56 files changed, 2014 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
new file mode 100644
index 0000000..09a3a0c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.io.Serializable;
+
+public interface Archiveable<T extends Serializable> {
+	T archive();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
new file mode 100644
index 0000000..faf920d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Serializable class which is created when archiving the job.
+ * It can be used to display job information on the web interface
+ * without having to keep the classloader around after job completion.
+ */
+public class ArchivedExecutionConfig implements Serializable {
+
+	private final String executionMode;
+	private final String restartStrategyDescription;
+	private final int parallelism;
+	private final boolean objectReuseEnabled;
+	private final Map<String, String> globalJobParameters;
+
+	public ArchivedExecutionConfig(ExecutionConfig ec) {
+		executionMode = ec.getExecutionMode().name();
+		if (ec.getRestartStrategy() != null) {
+			restartStrategyDescription = ec.getRestartStrategy().getDescription();
+		} else {
+			restartStrategyDescription = "default";
+		}
+		parallelism = ec.getParallelism();
+		objectReuseEnabled = ec.isObjectReuseEnabled();
+		if (ec.getGlobalJobParameters() != null
+				&& ec.getGlobalJobParameters().toMap() != null) {
+			globalJobParameters = ec.getGlobalJobParameters().toMap();
+		} else {
+			globalJobParameters = Collections.emptyMap();
+		}
+	}
+
+	public String getExecutionMode() {
+		return executionMode;
+	}
+
+	public String getRestartStrategyDescription() {
+		return restartStrategyDescription;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public boolean getObjectReuseEnabled() {
+		return objectReuseEnabled;
+	}
+
+	public Map<String, String> getGlobalJobParameters() {
+		return globalJobParameters;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index aadf867..a0a63b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -58,7 +58,7 @@ import java.util.Objects;
  * </ul>
  */
 @Public
-public class ExecutionConfig implements Serializable {
+public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecutionConfig> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -770,6 +770,11 @@ public class ExecutionConfig implements Serializable {
 	public boolean canEqual(Object obj) {
 		return obj instanceof ExecutionConfig;
 	}
+	
+	@Override
+	public ArchivedExecutionConfig archive() {
+		return new ArchivedExecutionConfig(this);
+	}
 
 
 	// ------------------------------ Utilities  ----------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 7691874..3d0cfc0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -47,7 +48,7 @@ public class ExecutionGraphHolder {
 
 	private final FiniteDuration timeout;
 
-	private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
+	private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
 
 	public ExecutionGraphHolder() {
 		this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
@@ -63,8 +64,8 @@ public class ExecutionGraphHolder {
 	 * @param jid jobID of the execution graph to be retrieved
 	 * @return the retrieved execution graph or null if it is not retrievable
 	 */
-	public ExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
-		ExecutionGraph cached = cache.get(jid);
+	public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
+		AccessExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
 			return cached;
 		}
@@ -78,7 +79,7 @@ public class ExecutionGraphHolder {
 					return null;
 				}
 				else if (result instanceof JobManagerMessages.JobFound) {
-					ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
+					AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
 					cache.put(jid, eg);
 					return eg;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 16cfb1a..ff28d4e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
@@ -53,7 +53,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 			throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage()); 
 		}
 		
-		ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
+		AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
 		if (eg == null) {
 			throw new NotFoundException("Could not find job with id " + jid);
 		}
@@ -61,5 +61,5 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 		return handleRequest(eg, pathParams);
 	}
 	
-	public abstract String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception;
+	public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
index 5b12907..a36f94a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -36,7 +36,7 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
 	}
 
 	@Override
-	public final String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+	public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		final String vidString = params.get("vertexid");
 		if (vidString == null) {
 			throw new IllegalArgumentException("vertexId parameter missing");
@@ -50,7 +50,7 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
 			throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
 		}
 
-		final ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+		final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
 		if (jobVertex == null) {
 			throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
 		}
@@ -58,5 +58,5 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
 		return handleRequest(jobVertex, params);
 	}
 	
-	public abstract String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
+	public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 672df16..f3a5059 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
@@ -37,7 +37,7 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 	}
 	
 	@Override
-	public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
 		final String attemptNumberString = params.get("attempt");
 		if (attemptNumberString == null) {
 			throw new RuntimeException("Attempt number parameter missing");
@@ -51,12 +51,12 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 			throw new RuntimeException("Invalid attempt number parameter");
 		}
 		
-		final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
+		final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
 		if (attempt == currentAttempt.getAttemptNumber()) {
 			return handleRequest(currentAttempt, params);
 		}
 		else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
-			Execution exec = vertex.getPriorExecutionAttempt(attempt);
+			AccessExecution exec = vertex.getPriorExecutionAttempt(attempt);
 			return handleRequest(exec, params);
 		}
 		else {
@@ -64,5 +64,5 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 		}
 	}
 
-	public abstract String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception;
+	public abstract String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
index 90866c6..d6b279c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
@@ -36,7 +36,7 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq
 	}
 
 	@Override
-	public final String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public final String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		final String subtaskNumberString = params.get("subtasknum");
 		if (subtaskNumberString == null) {
 			throw new RuntimeException("Subtask number parameter missing");
@@ -54,9 +54,9 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq
 			throw new RuntimeException("subtask does not exist: " + subtask); 
 		}
 		
-		final ExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
+		final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
 		return handleRequest(vertex, params);
 	}
 
-	public abstract String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception;
+	public abstract String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index c5418b3..29613a0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -36,7 +36,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
 		
 		StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
index b63ab0e..404a14e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import scala.Option;
 
@@ -39,7 +39,7 @@ public class JobCheckpointsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 75389b1..21639ef 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -22,8 +22,8 @@ import java.io.StringWriter;
 import java.util.Map;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 /**
@@ -36,7 +36,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
@@ -45,7 +45,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 		gen.writeStringField("jid", graph.getJobID().toString());
 		gen.writeStringField("name", graph.getJobName());
 
-		final ExecutionConfigSummary summary = graph.getExecutionConfigSummary();
+		final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig();
 
 		if (summary != null) {
 			gen.writeObjectFieldStart("execution-config");
@@ -59,7 +59,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 			Map<String, String> ucVals = summary.getGlobalJobParameters();
 			if (ucVals != null) {
 				gen.writeObjectFieldStart("user-config");
-
+				
 				for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
 					gen.writeStringField(ucVal.getKey(), ucVal.getValue());
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 884b859..e7a2a8c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -50,7 +51,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		final StringWriter writer = new StringWriter();
 		final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -84,13 +85,13 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 		int[] jobVerticesPerState = new int[ExecutionState.values().length];
 		gen.writeArrayFieldStart("vertices");
 
-		for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
+		for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) {
 			int[] tasksPerState = new int[ExecutionState.values().length];
 			long startTime = Long.MAX_VALUE;
 			long endTime = 0;
 			boolean allFinished = true;
 			
-			for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+			for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
 				final ExecutionState state = vertex.getExecutionState();
 				tasksPerState[state.ordinal()]++;
 
@@ -133,7 +134,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 
 			gen.writeStartObject();
 			gen.writeStringField("id", ejv.getJobVertexId().toString());
-			gen.writeStringField("name", ejv.getJobVertex().getName());
+			gen.writeStringField("name", ejv.getName());
 			gen.writeNumberField("parallelism", ejv.getParallelism());
 			gen.writeStringField("status", jobVertexState.name());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index ce154e3..90197d0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.ExceptionUtils;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -40,16 +39,16 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
 		gen.writeStartObject();
 		
 		// most important is the root failure cause
-		Throwable rootException = graph.getFailureCause();
+		String rootException = graph.getFailureCauseAsString();
 		if (rootException != null) {
-			gen.writeStringField("root-exception", ExceptionUtils.stringifyException(rootException));
+			gen.writeStringField("root-exception", rootException);
 		}
 
 		// we additionally collect all exceptions (up to a limit) that occurred in the individual tasks
@@ -58,8 +57,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 		int numExceptionsSoFar = 0;
 		boolean truncated = false;
 		
-		for (ExecutionVertex task : graph.getAllExecutionVertices()) {
-			Throwable t = task.getFailureCause();
+		for (AccessExecutionVertex task : graph.getAllExecutionVertices()) {
+			String t = task.getFailureCauseAsString();
 			if (t != null) {
 				if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
 					truncated = true;
@@ -71,8 +70,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 						location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";
 
 				gen.writeStartObject();
-				gen.writeStringField("exception", ExceptionUtils.stringifyException(t));
-				gen.writeStringField("task", task.getSimpleName());
+				gen.writeStringField("exception", t);
+				gen.writeStringField("task", task.getTaskNameWithSubtaskIndex());
 				gen.writeStringField("location", locationString);
 				gen.writeEndObject();
 				numExceptionsSoFar++;

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index 0389f5a..64f7000 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
@@ -34,7 +34,7 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		return graph.getJsonPlan();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 5df565a..ad4e207 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -35,7 +35,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
-	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
 		
 		StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index 65f82a3..c5bacf2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -56,9 +58,12 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 
 	@Override
 	public String handleRequest(
-			ExecutionJobVertex jobVertex,
+			AccessExecutionJobVertex accessJobVertex,
 			Map<String, String> params) throws Exception {
-
+		if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
+			return "";
+		}
+		ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex;
 		try (StringWriter writer = new StringWriter();
 				JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
index 6522de5..8a68ffa 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import scala.Option;
 
@@ -38,36 +37,31 @@ public class JobVertexCheckpointsHandler extends AbstractJobVertexRequestHandler
 	}
 
 	@Override
-	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 		gen.writeStartObject();
 
-		CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker();
+		Option<OperatorCheckpointStats> statsOption = jobVertex.getCheckpointStats();
 
-		if (tracker != null) {
-			Option<OperatorCheckpointStats> statsOption = tracker
-					.getOperatorStats(jobVertex.getJobVertexId());
+		if (statsOption.isDefined()) {
+			OperatorCheckpointStats stats = statsOption.get();
 
-			if (statsOption.isDefined()) {
-				OperatorCheckpointStats stats = statsOption.get();
+			gen.writeNumberField("id", stats.getCheckpointId());
+			gen.writeNumberField("timestamp", stats.getTriggerTimestamp());
+			gen.writeNumberField("duration", stats.getDuration());
+			gen.writeNumberField("size", stats.getStateSize());
+			gen.writeNumberField("parallelism", stats.getNumberOfSubTasks());
 
-				gen.writeNumberField("id", stats.getCheckpointId());
-				gen.writeNumberField("timestamp", stats.getTriggerTimestamp());
-				gen.writeNumberField("duration", stats.getDuration());
-				gen.writeNumberField("size", stats.getStateSize());
-				gen.writeNumberField("parallelism", stats.getNumberOfSubTasks());
-
-				gen.writeArrayFieldStart("subtasks");
-				for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
-					gen.writeStartObject();
-					gen.writeNumberField("subtask", i);
-					gen.writeNumberField("duration", stats.getSubTaskDuration(i));
-					gen.writeNumberField("size", stats.getSubTaskStateSize(i));
-					gen.writeEndObject();
-				}
-				gen.writeEndArray();
+			gen.writeArrayFieldStart("subtasks");
+			for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
+				gen.writeStartObject();
+				gen.writeNumberField("subtask", i);
+				gen.writeNumberField("duration", stats.getSubTaskDuration(i));
+				gen.writeNumberField("size", stats.getSubTaskStateSize(i));
+				gen.writeEndObject();
 			}
+			gen.writeEndArray();
 		}
 
 		gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 813ecb8..fbdd86b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -43,7 +43,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		final long now = System.currentTimeMillis();
 		
 		StringWriter writer = new StringWriter();
@@ -52,13 +52,13 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 		gen.writeStartObject();
 
 		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
-		gen.writeStringField("name", jobVertex.getJobVertex().getName());
+		gen.writeStringField("name", jobVertex.getName());
 		gen.writeNumberField("parallelism", jobVertex.getParallelism());
 		gen.writeNumberField("now", now);
 
 		gen.writeArrayFieldStart("subtasks");
 		int num = 0;
-		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
 			final ExecutionState status = vertex.getExecutionState();
 			
 			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index cbdb87f..0e94334 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -23,8 +23,9 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -46,18 +47,18 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 	}
 
 	@Override
-	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		// Build a map that groups tasks by TaskManager
-		Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>();
+		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
 
-		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
 			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
 			String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
 
-			List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager);
+			List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
 
 			if (vertices == null) {
-				vertices = new ArrayList<ExecutionVertex>();
+				vertices = new ArrayList<>();
 				taskManagerVertices.put(taskManager, vertices);
 			}
 
@@ -73,13 +74,13 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 		gen.writeStartObject();
 
 		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
-		gen.writeStringField("name", jobVertex.getJobVertex().getName());
+		gen.writeStringField("name", jobVertex.getName());
 		gen.writeNumberField("now", now);
 
 		gen.writeArrayFieldStart("taskmanagers");
-		for (Entry<String, List<ExecutionVertex>> entry : taskManagerVertices.entrySet()) {
+		for (Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
 			String host = entry.getKey();
-			List<ExecutionVertex> taskVertices = entry.getValue();
+			List<AccessExecutionVertex> taskVertices = entry.getValue();
 
 			int[] tasksPerState = new int[ExecutionState.values().length];
 
@@ -92,7 +93,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 			LongCounter tmReadRecords = new LongCounter();
 			LongCounter tmWriteRecords = new LongCounter();
 
-			for (ExecutionVertex vertex : taskVertices) {
+			for (AccessExecutionVertex vertex : taskVertices) {
 				final ExecutionState state = vertex.getExecutionState();
 				tasksPerState[state.ordinal()]++;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index d301bd1..811bea6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
@@ -33,7 +33,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
 	}
 
 	@Override
-	public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
 		return handleRequest(vertex.getCurrentExecutionAttempt(), params);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 14ccc0c..786f5e8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -38,7 +38,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 	}
 
 	@Override
-	public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
 		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
 		
 		StringWriter writer = new StringWriter();
@@ -46,7 +46,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 		gen.writeStartObject();
 
-		gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex());
+		gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex());
 		gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
 		gen.writeStringField("id", execAttempt.getAttemptId().toString());
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index a1e6d0e..3cc7376 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -41,7 +41,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 	}
 
 	@Override
-	public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
 		final ExecutionState status = execAttempt.getState();
 		final long now = System.currentTimeMillis();
 
@@ -78,7 +78,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
 		gen.writeStartObject();
-		gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex());
+		gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex());
 		gen.writeStringField("status", status.name());
 		gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
 		gen.writeStringField("host", locationString);

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 780bd4b..892a606 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -39,7 +39,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 	}
 
 	@Override
-	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -50,7 +50,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 		gen.writeArrayFieldStart("subtasks");
 		
 		int num = 0;
-		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
 
 			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
 			String locationString = location == null ? "(unassigned)" : location.getHostname();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 9e6276d..76349ee 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -41,7 +41,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 	}
 
 	@Override
-	public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
 		final long now = System.currentTimeMillis();
 
 		StringWriter writer = new StringWriter();
@@ -50,13 +50,13 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 		gen.writeStartObject();
 
 		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
-		gen.writeStringField("name", jobVertex.getJobVertex().getName());
+		gen.writeStringField("name", jobVertex.getName());
 		gen.writeNumberField("now", now);
 		
 		gen.writeArrayFieldStart("subtasks");
 
 		int num = 0;
-		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
 			
 			long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
 			ExecutionState status = vertex.getExecutionState();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 25dc189..507c977 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
@@ -153,7 +152,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 							ExecutionGraphFound executionGraphResponse =
 									expectMsgClass(ExecutionGraphFound.class);
 
-							ExecutionGraph executionGraph = executionGraphResponse.executionGraph();
+							ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
 							ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
 
 							StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..c4ce9d1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -125,7 +124,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 								jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor);
 								ExecutionGraphFound executionGraphResponse =
 										expectMsgClass(ExecutionGraphFound.class);
-								ExecutionGraph executionGraph = executionGraphResponse.executionGraph();
+								ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
 								ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
 
 								StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
index f882663..18aae35 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
@@ -20,9 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -37,8 +35,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -49,10 +45,9 @@ public class JobVertexCheckpointsHandlerTest {
 		JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler(
 				mock(ExecutionGraphHolder.class));
 
-		ExecutionGraph graph = mock(ExecutionGraph.class);
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-
-		when(vertex.getGraph()).thenReturn(graph);
+		when(vertex.getCheckpointStats())
+			.thenReturn(Option.<OperatorCheckpointStats>empty());
 
 		String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap());
 
@@ -65,15 +60,10 @@ public class JobVertexCheckpointsHandlerTest {
 		JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler(
 				mock(ExecutionGraphHolder.class));
 
-		ExecutionGraph graph = mock(ExecutionGraph.class);
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-
-		when(vertex.getGraph()).thenReturn(graph);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
 
 		// No stats
-		when(tracker.getOperatorStats(any(JobVertexID.class)))
+		when(vertex.getCheckpointStats())
 				.thenReturn(Option.<OperatorCheckpointStats>empty());
 
 		String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap());
@@ -89,13 +79,9 @@ public class JobVertexCheckpointsHandlerTest {
 
 		JobVertexID vertexId = new JobVertexID();
 
-		ExecutionGraph graph = mock(ExecutionGraph.class);
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 
 		when(vertex.getJobVertexId()).thenReturn(vertexId);
-		when(vertex.getGraph()).thenReturn(graph);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
 
 		long[][] subTaskStats = new long[][] {
 				new long[] { 1, 10 },
@@ -113,7 +99,7 @@ public class JobVertexCheckpointsHandlerTest {
 		OperatorCheckpointStats stats = new OperatorCheckpointStats(
 				3, 6812, 2800, 1024, subTaskStats);
 
-		when(tracker.getOperatorStats(eq(vertexId)))
+		when(vertex.getCheckpointStats())
 				.thenReturn(Option.apply(stats));
 
 		// Request stats

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
new file mode 100644
index 0000000..92df7d7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class ArchivedCheckpointStatsTracker implements CheckpointStatsTracker, Serializable {
+	private static final long serialVersionUID = 1469003563086353555L;
+
+	private final Option<JobCheckpointStats> jobStats;
+	private final Map<JobVertexID, OperatorCheckpointStats> operatorStats;
+
+	public ArchivedCheckpointStatsTracker(Option<JobCheckpointStats> jobStats, Map<JobVertexID, OperatorCheckpointStats> operatorStats) {
+		this.jobStats = jobStats;
+		this.operatorStats = operatorStats;
+	}
+
+	@Override
+	public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+	}
+
+	@Override
+	public Option<JobCheckpointStats> getJobStats() {
+		return jobStats;
+	}
+
+	@Override
+	public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId) {
+		return Option.apply(operatorStats.get(operatorId));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
index dc239dd..64f17d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.checkpoint.stats;
 
+import java.io.Serializable;
+
 /**
  * Statistics for a specific checkpoint.
  */
-public class CheckpointStats {
+public class CheckpointStats implements Serializable {
 
 	/** ID of the checkpoint. */
 	private final long checkpointId;

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
index b1d7ff2..e156c8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
@@ -20,12 +20,13 @@ package org.apache.flink.runtime.checkpoint.stats;
 
 import org.apache.flink.configuration.ConfigConstants;
 
+import java.io.Serializable;
 import java.util.List;
 
 /**
  * Snapshot of checkpoint statistics for a job.
  */
-public interface JobCheckpointStats {
+public interface JobCheckpointStats extends Serializable {
 
 	// ------------------------------------------------------------------------
 	// General stats

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
index 5b113d8..6c2d497 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
@@ -27,6 +27,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class OperatorCheckpointStats extends CheckpointStats {
 
+	private static final long serialVersionUID = -1594736655739376140L;
+
 	/** Duration in milliseconds and state sizes in bytes per sub task. */
 	private final long[][] subTaskStats;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index db8a0e0..39fbad5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -348,6 +348,8 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
 	 */
 	private static class JobCheckpointStatsSnapshot implements JobCheckpointStats {
 
+		private static final long serialVersionUID = 7558212015099742418L;
+
 		// General
 		private final List<CheckpointStats> recentHistory;
 		private final long count;

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
new file mode 100644
index 0000000..aefc17d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link Execution and {@link ArchivedExecution}.
+ */
+public interface AccessExecution {
+	/**
+	 * Returns the {@link ExecutionAttemptID} for this Execution.
+	 *
+	 * @return ExecutionAttemptID for this execution
+	 */
+	ExecutionAttemptID getAttemptId();
+
+	/**
+	 * Returns the attempt number for this execution.
+	 *
+	 * @return attempt number for this execution.
+	 */
+	int getAttemptNumber();
+
+	/**
+	 * Returns the timestamps for every {@link ExecutionState}.
+	 *
+	 * @return timestamps for each state
+	 */
+	long[] getStateTimestamps();
+
+	/**
+	 * Returns the current {@link ExecutionState} for this execution.
+	 *
+	 * @return execution state for this execution
+	 */
+	ExecutionState getState();
+
+	/**
+	 * Returns the {@link TaskManagerLocation} for this execution.
+	 *
+	 * @return taskmanager location for this execution.
+	 */
+	TaskManagerLocation getAssignedResourceLocation();
+
+	/**
+	 * Returns the exception that caused the job to fail. This is the first root exception
+	 * that was not recoverable and triggered job failure.
+	 *
+	 * @return failure exception as a string, or {@code "(null)"}
+	 */
+	String getFailureCauseAsString();
+
+	/**
+	 * Returns the timestamp for the given {@link ExecutionState}.
+	 *
+	 * @param state state for which the timestamp should be returned
+	 * @return timestamp for the given state
+	 */
+	long getStateTimestamp(ExecutionState state);
+
+	/**
+	 * Returns the user-defined accumulators as strings.
+	 *
+	 * @return user-defined accumulators as strings.
+	 */
+	StringifiedAccumulatorResult[] getUserAccumulatorsStringified();
+
+	/**
+	 * Returns the system-defined accumulators.
+	 *
+	 * @return system-defined accumulators.
+	 * @deprecated Will be removed in FLINK-4527
+	 */
+	@Deprecated
+	Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators();
+
+	/**
+	 * Returns the subtask index of this execution.
+	 *
+	 * @return subtask index of this execution.
+	 */
+	int getParallelSubtaskIndex();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
new file mode 100644
index 0000000..0ff6ace
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link ExecutionGraph} and {@link ArchivedExecutionGraph}.
+ */
+public interface AccessExecutionGraph {
+	/**
+	 * Returns the job plan as a JSON string.
+	 *
+	 * @return job plan as a JSON string
+	 */
+	String getJsonPlan();
+
+	/**
+	 * Returns the {@link JobID} for this execution graph.
+	 *
+	 * @return job ID for this execution graph
+	 */
+	JobID getJobID();
+
+	/**
+	 * Returns the job name for thie execution graph.
+	 *
+	 * @return job name for this execution graph
+	 */
+	String getJobName();
+
+	/**
+	 * Returns the current {@link JobStatus} for this execution graph.
+	 *
+	 * @return job status for this execution graph
+	 */
+	JobStatus getState();
+
+	/**
+	 * Returns the exception that caused the job to fail. This is the first root exception
+	 * that was not recoverable and triggered job failure.
+	 *
+	 * @return failure causing exception as a string, or {@code "(null)"}
+	 */
+	String getFailureCauseAsString();
+
+	/**
+	 * Returns the job vertex for the given {@link JobVertexID}.
+	 *
+	 * @param id id of job vertex to be returned
+	 * @return job vertex for the given id, or null
+	 */
+	AccessExecutionJobVertex getJobVertex(JobVertexID id);
+
+	/**
+	 * Returns a map containing all job vertices for this execution graph.
+	 *
+	 * @return map containing all job vertices for this execution graph
+	 */
+	Map<JobVertexID, ? extends AccessExecutionJobVertex> getAllVertices();
+
+	/**
+	 * Returns an iterable containing all job vertices for this execution graph in the order they were created.
+	 *
+	 * @return iterable containing all job vertices for this execution graph in the order they were creater
+	 */
+	Iterable<? extends AccessExecutionJobVertex> getVerticesTopologically();
+
+	/**
+	 * Returns an iterable containing all execution vertices for this execution graph.
+	 *
+	 * @return iterable containing all execution vertices for this execution graph
+	 */
+	Iterable<? extends AccessExecutionVertex> getAllExecutionVertices();
+
+	/**
+	 * Returns the timestamp for the given {@link JobStatus}
+	 *
+	 * @param status status for which the timestamp should be returned
+	 * @return timestamp for the given job status
+	 */
+	long getStatusTimestamp(JobStatus status);
+
+	/**
+	 * Returns the {@link CheckpointStatsTracker} for this execution graph.
+	 *
+	 * @return CheckpointStatsTracker for thie execution graph
+	 */
+	CheckpointStatsTracker getCheckpointStatsTracker();
+
+	/**
+	 * Returns the {@link ArchivedExecutionConfig} for this execution graph.
+	 *
+	 * @return execution config summary for this execution graph, or null in case of errors
+	 */
+	ArchivedExecutionConfig getArchivedExecutionConfig();
+
+	/**
+	 * Returns whether the job for this execution graph is stoppable.
+	 *
+	 * @return true, if all sources tasks are stoppable, false otherwise
+	 */
+	boolean isStoppable();
+
+	/**
+	 * Returns the aggregated user-defined accumulators as strings.
+	 *
+	 * @return aggregated user-defined accumulators as strings.
+	 */
+	StringifiedAccumulatorResult[] getAccumulatorResultsStringified();
+
+	/**
+	 * Returns a map containing the serialized values of user-defined accumulators.
+	 *
+	 * @return map containing serialized values of user-defined accumulators
+	 * @throws IOException indicates that the serialization has failed
+	 */
+	Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException;
+
+	/**
+	 * Returns the aggregated system-defined accumulators.
+	 *
+	 * @return aggregated system-defined accumulators.
+	 * @deprecated Will be removed in FLINK-4527
+	 */
+	@Deprecated
+	Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators();
+
+	/**
+	 * Returns whether this execution graph was archived.
+	 *
+	 * @return true, if the execution graph was archived, false otherwise
+	 */
+	boolean isArchived();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
new file mode 100644
index 0000000..c9bf604
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link ExecutionJobVertex} and {@link ArchivedExecutionJobVertex}.
+ */
+public interface AccessExecutionJobVertex {
+	/**
+	 * Returns the name for this job vertex.
+	 *
+	 * @return name for this job vertex.
+	 */
+	String getName();
+
+	/**
+	 * Returns the parallelism for this job vertex.
+	 *
+	 * @return parallelism for this job vertex.
+	 */
+	int getParallelism();
+
+	/**
+	 * Returns the max parallelism for this job vertex.
+	 *
+	 * @return max parallelism for this job vertex.
+	 */
+	int getMaxParallelism();
+
+	/**
+	 * Returns the {@link JobVertexID} for this job vertex.
+	 *
+	 * @return JobVertexID for this job vertex.
+	 */
+	JobVertexID getJobVertexId();
+
+	/**
+	 * Returns all execution vertices for this job vertex.
+	 *
+	 * @return all execution vertices for this job vertex
+	 */
+	AccessExecutionVertex[] getTaskVertices();
+
+	/**
+	 * Returns the aggregated {@link ExecutionState} for this job vertex.
+	 *
+	 * @return aggregated state for this job vertex
+	 */
+	ExecutionState getAggregateState();
+
+	/**
+	 * Returns the {@link OperatorCheckpointStats} for this job vertex.
+	 *
+	 * @return checkpoint stats for this job vertex.
+	 */
+	Option<OperatorCheckpointStats> getCheckpointStats();
+
+	/**
+	 * Returns the aggregated system-defined accumulators.
+	 *
+	 * @return aggregated system-defined accumulators.
+	 * @deprecated Will be removed in FLINK-4527
+	 */
+	@Deprecated
+	Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators();
+
+	/**
+	 * Returns the aggregated user-defined accumulators as strings.
+	 *
+	 * @return aggregated user-defined accumulators as strings.
+	 */
+	StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();
+}


[2/3] flink git commit: [FLINK-4720] Implement archived ExecutionGraph

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
new file mode 100644
index 0000000..9faf3fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}.
+ */
+public interface AccessExecutionVertex {
+	/**
+	 * Returns the name of this execution vertex in the format "myTask (2/7)".
+	 *
+	 * @return name of this execution vertex
+	 */
+	String getTaskNameWithSubtaskIndex();
+
+	/**
+	 * Returns the subtask index of this execution vertex.
+	 *
+	 * @return subtask index of this execution vertex.
+	 */
+	int getParallelSubtaskIndex();
+
+	/**
+	 * Returns the current execution for this execution vertex.
+	 *
+	 * @return current execution
+	 */
+	AccessExecution getCurrentExecutionAttempt();
+
+	/**
+	 * Returns the current {@link ExecutionState} for this execution vertex.
+	 *
+	 * @return execution state for this execution vertex
+	 */
+	ExecutionState getExecutionState();
+
+	/**
+	 * Returns the timestamp for the given {@link ExecutionState}.
+	 *
+	 * @param state state for which the timestamp should be returned
+	 * @return timestamp for the given state
+	 */
+	long getStateTimestamp(ExecutionState state);
+
+	/**
+	 * Returns the exception that caused the job to fail. This is the first root exception
+	 * that was not recoverable and triggered job failure.
+	 *
+	 * @return failure exception as a string, or {@code "(null)"}
+	 */
+	String getFailureCauseAsString();
+
+	/**
+	 * Returns the {@link TaskManagerLocation} for this execution vertex.
+	 *
+	 * @return taskmanager location for this execution vertex.
+	 */
+	TaskManagerLocation getCurrentAssignedResourceLocation();
+
+	/**
+	 * Returns the execution for the given attempt number.
+	 *
+	 * @param attemptNumber attempt number of execution to be returned
+	 * @return execution for the given attempt number
+	 */
+	AccessExecution getPriorExecutionAttempt(int attemptNumber);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
new file mode 100644
index 0000000..0b2992f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class ArchivedExecution implements AccessExecution, Serializable {
+	private static final long serialVersionUID = 4817108757483345173L;
+	// --------------------------------------------------------------------------------------------
+
+	private final ExecutionAttemptID attemptId;
+
+	private final long[] stateTimestamps;
+
+	private final int attemptNumber;
+
+	private final ExecutionState state;
+
+	private final String failureCause;          // once assigned, never changes
+
+	private final TaskManagerLocation assignedResourceLocation; // for the archived execution
+
+	/* Continuously updated map of user-defined accumulators */
+	private final StringifiedAccumulatorResult[] userAccumulators;
+
+	/* Continuously updated map of internal accumulators */
+	private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
+	private final int parallelSubtaskIndex;
+
+	public ArchivedExecution(Execution execution) {
+		this.userAccumulators = execution.getUserAccumulatorsStringified();
+		this.flinkAccumulators = execution.getFlinkAccumulators();
+		this.attemptId = execution.getAttemptId();
+		this.attemptNumber = execution.getAttemptNumber();
+		this.stateTimestamps = execution.getStateTimestamps();
+		this.parallelSubtaskIndex = execution.getVertex().getParallelSubtaskIndex();
+		this.state = execution.getState();
+		this.failureCause = ExceptionUtils.stringifyException(execution.getFailureCause());
+		this.assignedResourceLocation = execution.getAssignedResourceLocation();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//   Accessors
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public ExecutionAttemptID getAttemptId() {
+		return attemptId;
+	}
+
+	@Override
+	public int getAttemptNumber() {
+		return attemptNumber;
+	}
+
+	@Override
+	public long[] getStateTimestamps() {
+		return stateTimestamps;
+	}
+
+	@Override
+	public ExecutionState getState() {
+		return state;
+	}
+
+	@Override
+	public TaskManagerLocation getAssignedResourceLocation() {
+		return assignedResourceLocation;
+	}
+
+	@Override
+	public String getFailureCauseAsString() {
+		return failureCause;
+	}
+
+	@Override
+	public long getStateTimestamp(ExecutionState state) {
+		return this.stateTimestamps[state.ordinal()];
+	}
+
+	@Override
+	public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
+		return userAccumulators;
+	}
+
+	@Override
+	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
+		return flinkAccumulators;
+	}
+
+	@Override
+	public int getParallelSubtaskIndex() {
+		return parallelSubtaskIndex;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
new file mode 100644
index 0000000..493825a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
+	private static final long serialVersionUID = 7231383912742578428L;
+	// --------------------------------------------------------------------------------------------
+
+	/** The ID of the job this graph has been built for. */
+	private final JobID jobID;
+
+	/** The name of the original job graph. */
+	private final String jobName;
+
+	/** All job vertices that are part of this graph */
+	private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
+
+	/** All vertices, in the order in which they were created **/
+	private final List<ArchivedExecutionJobVertex> verticesInCreationOrder;
+
+	/**
+	 * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
+	 * the execution graph transitioned into a certain state. The index into this array is the
+	 * ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is
+	 * at {@code stateTimestamps[RUNNING.ordinal()]}.
+	 */
+	private final long[] stateTimestamps;
+
+	// ------ Configuration of the Execution -------
+
+	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
+
+	/** Current status of the job execution */
+	private final JobStatus state;
+
+	/**
+	 * The exception that caused the job to fail. This is set to the first root exception
+	 * that was not recoverable and triggered job failure
+	 */
+	private final String failureCause;
+
+	// ------ Fields that are only relevant for archived execution graphs ------------
+	private final String jsonPlan;
+	private final StringifiedAccumulatorResult[] archivedUserAccumulators;
+	private final ArchivedExecutionConfig archivedExecutionConfig;
+	private final boolean isStoppable;
+	private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
+	private final ArchivedCheckpointStatsTracker tracker;
+
+	public ArchivedExecutionGraph(
+		JobID jobID,
+		String jobName,
+		Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
+		List<ArchivedExecutionJobVertex> verticesInCreationOrder,
+		long[] stateTimestamps,
+		JobStatus state,
+		String failureCause,
+		String jsonPlan,
+		StringifiedAccumulatorResult[] archivedUserAccumulators,
+		Map<String, SerializedValue<Object>> serializedUserAccumulators,
+		ArchivedExecutionConfig executionConfig,
+		boolean isStoppable,
+		ArchivedCheckpointStatsTracker tracker
+	) {
+		this.jobID = jobID;
+		this.jobName = jobName;
+		this.tasks = tasks;
+		this.verticesInCreationOrder = verticesInCreationOrder;
+		this.stateTimestamps = stateTimestamps;
+		this.state = state;
+		this.failureCause = failureCause;
+		this.jsonPlan = jsonPlan;
+		this.archivedUserAccumulators = archivedUserAccumulators;
+		this.serializedUserAccumulators = serializedUserAccumulators;
+		this.archivedExecutionConfig = executionConfig;
+		this.isStoppable = isStoppable;
+		this.tracker = tracker;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public String getJsonPlan() {
+		return jsonPlan;
+	}
+
+	@Override
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	@Override
+	public String getJobName() {
+		return jobName;
+	}
+
+	@Override
+	public JobStatus getState() {
+		return state;
+	}
+
+	@Override
+	public String getFailureCauseAsString() {
+		return failureCause;
+	}
+
+	@Override
+	public ArchivedExecutionJobVertex getJobVertex(JobVertexID id) {
+		return this.tasks.get(id);
+	}
+
+	@Override
+	public Map<JobVertexID, AccessExecutionJobVertex> getAllVertices() {
+		return Collections.<JobVertexID, AccessExecutionJobVertex>unmodifiableMap(this.tasks);
+	}
+
+	@Override
+	public Iterable<ArchivedExecutionJobVertex> getVerticesTopologically() {
+		// we return a specific iterator that does not fail with concurrent modifications
+		// the list is append only, so it is safe for that
+		final int numElements = this.verticesInCreationOrder.size();
+
+		return new Iterable<ArchivedExecutionJobVertex>() {
+			@Override
+			public Iterator<ArchivedExecutionJobVertex> iterator() {
+				return new Iterator<ArchivedExecutionJobVertex>() {
+					private int pos = 0;
+
+					@Override
+					public boolean hasNext() {
+						return pos < numElements;
+					}
+
+					@Override
+					public ArchivedExecutionJobVertex next() {
+						if (hasNext()) {
+							return verticesInCreationOrder.get(pos++);
+						} else {
+							throw new NoSuchElementException();
+						}
+					}
+
+					@Override
+					public void remove() {
+						throw new UnsupportedOperationException();
+					}
+				};
+			}
+		};
+	}
+
+	@Override
+	public Iterable<ArchivedExecutionVertex> getAllExecutionVertices() {
+		return new Iterable<ArchivedExecutionVertex>() {
+			@Override
+			public Iterator<ArchivedExecutionVertex> iterator() {
+				return new AllVerticesIterator(getVerticesTopologically().iterator());
+			}
+		};
+	}
+
+	@Override
+	public long getStatusTimestamp(JobStatus status) {
+		return this.stateTimestamps[status.ordinal()];
+	}
+
+	@Override
+	public CheckpointStatsTracker getCheckpointStatsTracker() {
+		return tracker;
+	}
+
+	/**
+	 * Gets the internal flink accumulator map of maps which contains some metrics.
+	 *
+	 * @return A map of accumulators for every executed task.
+	 */
+	@Override
+	public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators() {
+		Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
+			new HashMap<>();
+
+		for (AccessExecutionVertex vertex : getAllExecutionVertices()) {
+			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+			flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs);
+		}
+
+		return flinkAccumulators;
+	}
+
+	@Override
+	public boolean isArchived() {
+		return true;
+	}
+
+	public StringifiedAccumulatorResult[] getUserAccumulators() {
+		return archivedUserAccumulators;
+	}
+
+	public ArchivedExecutionConfig getArchivedExecutionConfig() {
+		return archivedExecutionConfig;
+	}
+
+	@Override
+	public boolean isStoppable() {
+		return isStoppable;
+	}
+
+	@Override
+	public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
+		return archivedUserAccumulators;
+	}
+
+	@Override
+	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() {
+		return serializedUserAccumulators;
+	}
+
+	class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> {
+
+		private final Iterator<ArchivedExecutionJobVertex> jobVertices;
+
+		private ArchivedExecutionVertex[] currVertices;
+
+		private int currPos;
+
+
+		public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> jobVertices) {
+			this.jobVertices = jobVertices;
+		}
+
+
+		@Override
+		public boolean hasNext() {
+			while (true) {
+				if (currVertices != null) {
+					if (currPos < currVertices.length) {
+						return true;
+					} else {
+						currVertices = null;
+					}
+				} else if (jobVertices.hasNext()) {
+					currVertices = jobVertices.next().getTaskVertices();
+					currPos = 0;
+				} else {
+					return false;
+				}
+			}
+		}
+
+		@Override
+		public ArchivedExecutionVertex next() {
+			if (hasNext()) {
+				return currVertices[currPos++];
+			} else {
+				throw new NoSuchElementException();
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
new file mode 100644
index 0000000..4857bf5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregateJobVertexState;
+
+public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Serializable {
+
+	private static final long serialVersionUID = -5768187638639437957L;
+	private final ArchivedExecutionVertex[] taskVertices;
+
+	private final JobVertexID id;
+
+	private final String name;
+
+	private final int parallelism;
+
+	private final int maxParallelism;
+
+	private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> aggregatedMetricAccumulators;
+	private final Option<OperatorCheckpointStats> checkpointStats;
+	private final StringifiedAccumulatorResult[] archivedUserAccumulators;
+
+	public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
+		this.taskVertices = new ArchivedExecutionVertex[jobVertex.getTaskVertices().length];
+		for (int x = 0; x < taskVertices.length; x++) {
+			taskVertices[x] = jobVertex.getTaskVertices()[x].archive();
+		}
+
+		aggregatedMetricAccumulators = jobVertex.getAggregatedMetricAccumulators();
+
+		Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = new HashMap<>();
+		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
+			if (next != null) {
+				AccumulatorHelper.mergeInto(tmpArchivedUserAccumulators, next);
+			}
+		}
+		archivedUserAccumulators = jobVertex.getAggregatedUserAccumulatorsStringified();
+
+		this.id = jobVertex.getJobVertexId();
+		this.name = jobVertex.getJobVertex().getName();
+		this.parallelism = jobVertex.getParallelism();
+		this.maxParallelism = jobVertex.getMaxParallelism();
+		CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker();
+		checkpointStats = tracker != null
+			? tracker.getOperatorStats(this.id)
+			: Option.<OperatorCheckpointStats>empty();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//   Accessors
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String getName() {
+		return name;
+	}
+
+	@Override
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	@Override
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	@Override
+	public JobVertexID getJobVertexId() {
+		return id;
+	}
+
+	@Override
+	public ArchivedExecutionVertex[] getTaskVertices() {
+		return taskVertices;
+	}
+
+	@Override
+	public ExecutionState getAggregateState() {
+		int[] num = new int[ExecutionState.values().length];
+		for (ArchivedExecutionVertex vertex : this.taskVertices) {
+			num[vertex.getExecutionState().ordinal()]++;
+		}
+
+		return getAggregateJobVertexState(num, parallelism);
+	}
+
+	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() {
+		return this.aggregatedMetricAccumulators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Static / pre-assigned input splits
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Option<OperatorCheckpointStats> getCheckpointStats() {
+		return checkpointStats;
+	}
+
+	@Override
+	public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
+		return archivedUserAccumulators;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
new file mode 100644
index 0000000..e1fb11a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
+
+	private static final long serialVersionUID = -6708241535015028576L;
+	private final int subTaskIndex;
+
+	private final List<ArchivedExecution> priorExecutions;
+
+	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+	private final String taskNameWithSubtask;
+
+	private final ArchivedExecution currentExecution;    // this field must never be null
+
+	public ArchivedExecutionVertex(ExecutionVertex vertex) {
+		this.subTaskIndex = vertex.getParallelSubtaskIndex();
+		this.priorExecutions = new ArrayList<>();
+		for (Execution priorExecution : vertex.getPriorExecutions()) {
+			priorExecutions.add(priorExecution.archive());
+		}
+		this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
+		this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//   Accessors
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String getTaskNameWithSubtaskIndex() {
+		return this.taskNameWithSubtask;
+	}
+
+	@Override
+	public int getParallelSubtaskIndex() {
+		return this.subTaskIndex;
+	}
+
+	@Override
+	public ArchivedExecution getCurrentExecutionAttempt() {
+		return currentExecution;
+	}
+
+	@Override
+	public ExecutionState getExecutionState() {
+		return currentExecution.getState();
+	}
+
+	@Override
+	public long getStateTimestamp(ExecutionState state) {
+		return currentExecution.getStateTimestamp(state);
+	}
+
+	@Override
+	public String getFailureCauseAsString() {
+		return currentExecution.getFailureCauseAsString();
+	}
+
+	@Override
+	public TaskManagerLocation getCurrentAssignedResourceLocation() {
+		return currentExecution.getAssignedResourceLocation();
+	}
+
+	@Override
+	public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) {
+		if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+			return priorExecutions.get(attemptNumber);
+		} else {
+			throw new IllegalArgumentException("attempt does not exist");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b92e3af..0b56931 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -102,7 +103,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
  * actions if it is not. Many actions are also idempotent (like canceling).
  */
-public class Execution {
+public class Execution implements AccessExecution, Archiveable<ArchivedExecution> {
 
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
@@ -188,14 +189,17 @@ public class Execution {
 		return vertex;
 	}
 
+	@Override
 	public ExecutionAttemptID getAttemptId() {
 		return attemptId;
 	}
 
+	@Override
 	public int getAttemptNumber() {
 		return attemptNumber;
 	}
 
+	@Override
 	public ExecutionState getState() {
 		return state;
 	}
@@ -204,6 +208,7 @@ public class Execution {
 		return assignedResource;
 	}
 
+	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
 		return assignedResourceLocation;
 	}
@@ -212,10 +217,17 @@ public class Execution {
 		return failureCause;
 	}
 
+	@Override
+	public String getFailureCauseAsString() {
+		return ExceptionUtils.stringifyException(getFailureCause());
+	}
+
+	@Override
 	public long[] getStateTimestamps() {
 		return stateTimestamps;
 	}
 
+	@Override
 	public long getStateTimestamp(ExecutionState state) {
 		return this.stateTimestamps[state.ordinal()];
 	}
@@ -237,21 +249,6 @@ public class Execution {
 	}
 
 	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
-	 */
-	public void prepareForArchiving() {
-		if (assignedResource != null && assignedResource.isAlive()) {
-			throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running.");
-		}
-		assignedResource = null;
-
-		executionContext = null;
-
-		partialInputChannelDeploymentDescriptors.clear();
-		partialInputChannelDeploymentDescriptors = null;
-	}
-
-	/**
 	 * Sets the initial state for the execution. The serialized state is then shipped via the
 	 * {@link TaskDeploymentDescriptor} to the TaskManagers.
 	 *
@@ -1055,14 +1052,21 @@ public class Execution {
 		return userAccumulators;
 	}
 
+	@Override
 	public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
 		return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
 	}
 
+	@Override
 	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
 		return flinkAccumulators;
 	}
 
+	@Override
+	public int getParallelSubtaskIndex() {
+		return getVertex().getParallelSubtaskIndex();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Standard utilities
 	// ------------------------------------------------------------------------
@@ -1072,4 +1076,9 @@ public class Execution {
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
 				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
 	}
+
+	@Override
+	public ArchivedExecution archive() {
+		return new ArchivedExecution(this);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 10f0e88..aa9406c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -32,14 +33,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
-import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -58,8 +61,10 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
+import scala.Option;
 
 import java.io.IOException;
 import java.net.URL;
@@ -102,7 +107,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *         address the message receiver.</li>
  * </ul>
  */
-public class ExecutionGraph {
+public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> {
 
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
@@ -180,9 +185,6 @@ public class ExecutionGraph {
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
-	/** Flag to indicate whether the Graph has been archived */
-	private boolean isArchived = false;
-
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
 	/** Current status of the job execution */
@@ -222,9 +224,6 @@ public class ExecutionGraph {
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
-	/** Serializable summary of all job config values, e.g. for web interface */
-	private ExecutionConfigSummary executionConfigSummary;
-
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -304,16 +303,6 @@ public class ExecutionGraph {
 		metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
 
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
-
-		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
-		try {
-			ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader);
-			if (executionConfig != null) {
-				this.executionConfigSummary = new ExecutionConfigSummary(executionConfig);
-			}
-		} catch (IOException | ClassNotFoundException e) {
-			LOG.error("Couldn't create ExecutionConfigSummary for job {} ", jobID, e);
-		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -344,8 +333,9 @@ public class ExecutionGraph {
 		return scheduleMode;
 	}
 
+	@Override
 	public boolean isArchived() {
-		return isArchived;
+		return false;
 	}
 
 	public void enableSnapshotCheckpointing(
@@ -434,6 +424,7 @@ public class ExecutionGraph {
 		return restartStrategy;
 	}
 
+	@Override
 	public CheckpointStatsTracker getCheckpointStatsTracker() {
 		return checkpointStatsTracker;
 	}
@@ -484,6 +475,7 @@ public class ExecutionGraph {
 		this.jsonPlan = jsonPlan;
 	}
 
+	@Override
 	public String getJsonPlan() {
 		return jsonPlan;
 	}
@@ -492,14 +484,17 @@ public class ExecutionGraph {
 		return slotProvider;
 	}
 
+	@Override
 	public JobID getJobID() {
 		return jobID;
 	}
 
+	@Override
 	public String getJobName() {
 		return jobName;
 	}
 
+	@Override
 	public boolean isStoppable() {
 		return this.isStoppable;
 	}
@@ -512,6 +507,7 @@ public class ExecutionGraph {
 		return this.userClassLoader;
 	}
 
+	@Override
 	public JobStatus getState() {
 		return state;
 	}
@@ -520,14 +516,22 @@ public class ExecutionGraph {
 		return failureCause;
 	}
 
+	@Override
+	public String getFailureCauseAsString() {
+		return ExceptionUtils.stringifyException(failureCause);
+	}
+
+	@Override
 	public ExecutionJobVertex getJobVertex(JobVertexID id) {
 		return this.tasks.get(id);
 	}
 
+	@Override
 	public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
 		return Collections.unmodifiableMap(this.tasks);
 	}
 
+	@Override
 	public Iterable<ExecutionJobVertex> getVerticesTopologically() {
 		// we return a specific iterator that does not fail with concurrent modifications
 		// the list is append only, so it is safe for that
@@ -566,6 +570,7 @@ public class ExecutionGraph {
 		return Collections.unmodifiableMap(this.intermediateResults);
 	}
 
+	@Override
 	public Iterable<ExecutionVertex> getAllExecutionVertices() {
 		return new Iterable<ExecutionVertex>() {
 			@Override
@@ -575,6 +580,7 @@ public class ExecutionGraph {
 		};
 	}
 
+	@Override
 	public long getStatusTimestamp(JobStatus status) {
 		return this.stateTimestamps[status.ordinal()];
 	}
@@ -592,6 +598,7 @@ public class ExecutionGraph {
 	 * Gets the internal flink accumulator map of maps which contains some metrics.
 	 * @return A map of accumulators for every executed task.
 	 */
+	@Override
 	public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> getFlinkAccumulators() {
 		Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
 				new HashMap<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>();
@@ -627,6 +634,7 @@ public class ExecutionGraph {
 	 * @return The accumulator map with serialized accumulator values.
 	 * @throws IOException
 	 */
+	@Override
 	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
 
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
@@ -643,6 +651,7 @@ public class ExecutionGraph {
 	 * Returns the a stringified version of the user-defined accumulators.
 	 * @return an Array containing the StringifiedAccumulatorResult objects
 	 */
+	@Override
 	public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
 		return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
@@ -926,51 +935,21 @@ public class ExecutionGraph {
 	}
 
 	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
+	 * Returns the serializable ArchivedExecutionConfig
+	 * @return ArchivedExecutionConfig which may be null in case of errors
 	 */
-	public void prepareForArchiving() {
-		if (!state.isGloballyTerminalState()) {
-			throw new IllegalStateException("Can only archive the job from a terminal state");
-		}
-
-		// clear the non-serializable fields
-		restartStrategy = null;
-		slotProvider = null;
-		checkpointCoordinator = null;
-		executionContext = null;
-		kvStateLocationRegistry = null;
-
-		for (ExecutionJobVertex vertex : verticesInCreationOrder) {
-			vertex.prepareForArchiving();
-		}
-
-		intermediateResults.clear();
-		currentExecutions.clear();
-		requiredJarFiles.clear();
-		requiredClasspaths.clear();
-		jobStatusListeners.clear();
-		executionListeners.clear();
-
-		if (userClassLoader instanceof FlinkUserCodeClassLoader) {
-			try {
-				// close the classloader to free space of user jars immediately
-				// otherwise we have to wait until garbage collection
-				((FlinkUserCodeClassLoader) userClassLoader).close();
-			} catch (IOException e) {
-				LOG.warn("Failed to close the user classloader for job {}", jobID, e);
+	@Override
+	public ArchivedExecutionConfig getArchivedExecutionConfig() {
+		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
+		try {
+			ExecutionConfig executionConfig = getSerializedExecutionConfig().deserializeValue(userClassLoader);
+			if (executionConfig != null) {
+				return executionConfig.archive();
 			}
-		}
-		userClassLoader = null;
-
-		isArchived = true;
-	}
-
-	/**
-	 * Returns the serializable ExecutionConfigSummary
-	 * @return ExecutionConfigSummary which may be null in case of errors
-	 */
-	public ExecutionConfigSummary getExecutionConfigSummary() {
-		return executionConfigSummary;
+		} catch (IOException | ClassNotFoundException e) {
+			LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", jobID, e);
+		};
+		return null;
 	}
 
 	/**
@@ -1282,4 +1261,53 @@ public class ExecutionGraph {
 			}
 		}
 	}
+
+	@Override
+	public ArchivedExecutionGraph archive() {
+		Map<JobVertexID, OperatorCheckpointStats> operatorStats = new HashMap<>();
+		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>();
+		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
+		for (ExecutionJobVertex task : verticesInCreationOrder) {
+			ArchivedExecutionJobVertex archivedTask = task.archive();
+			archivedVerticesInCreationOrder.add(archivedTask);
+			archivedTasks.put(task.getJobVertexId(), archivedTask);
+			Option<OperatorCheckpointStats> statsOption = task.getCheckpointStats();
+			if (statsOption.isDefined()) {
+				operatorStats.put(task.getJobVertexId(), statsOption.get());
+			}
+		}
+
+		Option<JobCheckpointStats> jobStats;
+		if (getCheckpointStatsTracker() == null) {
+			jobStats = Option.empty();
+		} else {
+			jobStats = getCheckpointStatsTracker().getJobStats();
+		}
+
+		ArchivedCheckpointStatsTracker statsTracker = new ArchivedCheckpointStatsTracker(jobStats, operatorStats);
+
+		Map<String, SerializedValue<Object>> serializedUserAccumulators;
+		try {
+			serializedUserAccumulators = getAccumulatorsSerialized();
+		} catch (Exception e) {
+			LOG.warn("Error occurred while archiving user accumulators.", e);
+			serializedUserAccumulators = Collections.emptyMap();
+		}
+
+		return new ArchivedExecutionGraph(
+			getJobID(),
+			getJobName(),
+			archivedTasks,
+			archivedVerticesInCreationOrder,
+			stateTimestamps,
+			getState(),
+			getFailureCauseAsString(),
+			getJsonPlan(),
+			getAccumulatorResultsStringified(),
+			serializedUserAccumulators,
+			getArchivedExecutionConfig(),
+			isStoppable(),
+			statsTracker
+		);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ead0852..e7f16a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -28,7 +28,10 @@ import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -43,6 +46,7 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
+import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -51,7 +55,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ExecutionJobVertex {
+public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> {
 
 	/** Use the same log for all ExecutionGraph classes */
 	private static final Logger LOG = ExecutionGraph.LOG;
@@ -197,10 +201,17 @@ public class ExecutionJobVertex {
 		return jobVertex;
 	}
 
+	@Override
+	public String getName() {
+		return getJobVertex().getName();
+	}
+
+	@Override
 	public int getParallelism() {
 		return parallelism;
 	}
 
+	@Override
 	public int getMaxParallelism() {
 		return maxParallelism;
 	}
@@ -209,10 +220,12 @@ public class ExecutionJobVertex {
 		return graph.getJobID();
 	}
 	
+	@Override
 	public JobVertexID getJobVertexId() {
 		return jobVertex.getID();
 	}
 	
+	@Override
 	public ExecutionVertex[] getTaskVertices() {
 		return taskVertices;
 	}
@@ -241,6 +254,7 @@ public class ExecutionJobVertex {
 		return numSubtasksInFinalState == parallelism;
 	}
 	
+	@Override
 	public ExecutionState getAggregateState() {
 		int[] num = new int[ExecutionState.values().length];
 		for (ExecutionVertex vertex : this.taskVertices) {
@@ -250,6 +264,16 @@ public class ExecutionJobVertex {
 		return getAggregateJobVertexState(num, parallelism);
 	}
 	
+	@Override
+	public Option<OperatorCheckpointStats> getCheckpointStats() {
+		CheckpointStatsTracker tracker = getGraph().getCheckpointStatsTracker();
+		if (tracker == null) {
+			return Option.empty();
+		} else {
+			return tracker.getOperatorStats(getJobVertexId());
+		}
+	}
+
 	//---------------------------------------------------------------------------------------------
 	
 	public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
@@ -371,36 +395,6 @@ public class ExecutionJobVertex {
 		}
 	}
 	
-	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
-	 */
-	public void prepareForArchiving() {
-		
-		for (ExecutionVertex vertex : taskVertices) {
-			vertex.prepareForArchiving();
-		}
-		
-		// clear intermediate results
-		inputs.clear();
-		producedDataSets = null;
-		
-		// reset shared groups
-		if (slotSharingGroup != null) {
-			slotSharingGroup.clearTaskAssignment();
-		}
-		if (coLocationGroup != null) {
-			coLocationGroup.resetConstraints();
-		}
-		
-		// reset splits and split assigner
-		splitAssigner = null;
-		if (inputSplits != null) {
-			for (int i = 0; i < inputSplits.length; i++) {
-				inputSplits[i] = null;
-			}
-		}
-	}
-	
 	//---------------------------------------------------------------------------------------------
 	//  Notifications
 	//---------------------------------------------------------------------------------------------
@@ -627,4 +621,9 @@ public class ExecutionJobVertex {
 			return ExecutionState.CREATED;
 		}
 	}
+
+	@Override
+	public ArchivedExecutionJobVertex archive() {
+		return new ArchivedExecutionJobVertex(this);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 4837803..96af91e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -72,7 +74,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
  * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
  * which time it spawns an {@link Execution}.
  */
-public class ExecutionVertex {
+public class ExecutionVertex implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
 
 	private static final Logger LOG = ExecutionGraph.LOG;
 
@@ -176,6 +178,7 @@ public class ExecutionVertex {
 		return this.jobVertex.getJobVertex().getName();
 	}
 
+	@Override
 	public String getTaskNameWithSubtaskIndex() {
 		return this.taskNameWithSubtask;
 	}
@@ -188,6 +191,7 @@ public class ExecutionVertex {
 		return this.jobVertex.getMaxParallelism();
 	}
 
+	@Override
 	public int getParallelSubtaskIndex() {
 		return this.subTaskIndex;
 	}
@@ -207,18 +211,26 @@ public class ExecutionVertex {
 		return locationConstraint;
 	}
 
+	@Override
 	public Execution getCurrentExecutionAttempt() {
 		return currentExecution;
 	}
 
+	@Override
 	public ExecutionState getExecutionState() {
 		return currentExecution.getState();
 	}
 
+	@Override
 	public long getStateTimestamp(ExecutionState state) {
 		return currentExecution.getStateTimestamp(state);
 	}
 
+	@Override
+	public String getFailureCauseAsString() {
+		return ExceptionUtils.stringifyException(getFailureCause());
+	}
+
 	public Throwable getFailureCause() {
 		return currentExecution.getFailureCause();
 	}
@@ -227,10 +239,12 @@ public class ExecutionVertex {
 		return currentExecution.getAssignedResource();
 	}
 
+	@Override
 	public TaskManagerLocation getCurrentAssignedResourceLocation() {
 		return currentExecution.getAssignedResourceLocation();
 	}
 
+	@Override
 	public Execution getPriorExecutionAttempt(int attemptNumber) {
 		if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
 			return priorExecutions.get(attemptNumber);
@@ -240,6 +254,10 @@ public class ExecutionVertex {
 		}
 	}
 
+	List<Execution> getPriorExecutions() {
+		return priorExecutions;
+	}
+
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
 	}
@@ -537,31 +555,6 @@ public class ExecutionVertex {
 		}
 	}
 
-	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
-	 */
-	public void prepareForArchiving() throws IllegalStateException {
-		Execution execution = currentExecution;
-
-		// sanity check
-		if (!execution.isFinished()) {
-			throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
-		}
-
-		// prepare the current execution for archiving
-		execution.prepareForArchiving();
-
-		// prepare previous executions for archiving
-		for (Execution exec : priorExecutions) {
-			exec.prepareForArchiving();
-		}
-
-		// clear the unnecessary fields in this class
-		this.resultPartitions = null;
-		this.inputEdges = null;
-		this.locationConstraint = null;
-	}
-
 	public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo){
 		getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
 	}
@@ -708,4 +701,9 @@ public class ExecutionVertex {
 	public String toString() {
 		return getSimpleName();
 	}
+
+	@Override
+	public ArchivedExecutionVertex archive() {
+		return new ArchivedExecutionVertex(this);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
deleted file mode 100644
index ad4677f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.executiongraph.archive;
-
-import org.apache.flink.api.common.ExecutionConfig;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Serializable class which is created when archiving the job.
- * It can be used to display job information on the web interface
- * without having to keep the classloader around after job completion.
- */
-public class ExecutionConfigSummary implements Serializable {
-
-	private final String executionMode;
-	private final String restartStrategyDescription;
-	private final int parallelism;
-	private final boolean objectReuseEnabled;
-	private final Map<String, String> globalJobParameters;
-
-	public ExecutionConfigSummary(ExecutionConfig ec) {
-		executionMode = ec.getExecutionMode().name();
-		if (ec.getRestartStrategy() != null) {
-			restartStrategyDescription = ec.getRestartStrategy().getDescription();
-		} else {
-			restartStrategyDescription = "default";
-		}
-		parallelism = ec.getParallelism();
-		objectReuseEnabled = ec.isObjectReuseEnabled();
-		if (ec.getGlobalJobParameters() != null
-				&& ec.getGlobalJobParameters().toMap() != null) {
-			globalJobParameters = ec.getGlobalJobParameters().toMap();
-		} else {
-			globalJobParameters = Collections.emptyMap();
-		}
-	}
-
-	public String getExecutionMode() {
-		return executionMode;
-	}
-
-	public String getRestartStrategyDescription() {
-		return restartStrategyDescription;
-	}
-
-	public int getParallelism() {
-		return parallelism;
-	}
-
-	public boolean getObjectReuseEnabled() {
-		return objectReuseEnabled;
-	}
-
-	public Map<String, String> getGlobalJobParameters() {
-		return globalJobParameters;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 37a91b3..87df0d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -27,9 +27,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -164,7 +164,7 @@ public final class WebMonitorUtils {
 		}
 	}
 
-	public static JobDetails createDetailsForJob(ExecutionGraph job) {
+	public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
 		JobStatus status = job.getState();
 
 		long started = job.getStatusTimestamp(JobStatus.CREATED);
@@ -174,11 +174,11 @@ public final class WebMonitorUtils {
 		long lastChanged = 0;
 		int numTotalTasks = 0;
 
-		for (ExecutionJobVertex ejv : job.getVerticesTopologically()) {
-			ExecutionVertex[] vertices = ejv.getTaskVertices();
+		for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
+			AccessExecutionVertex[] vertices = ejv.getTaskVertices();
 			numTotalTasks += vertices.length;
 
-			for (ExecutionVertex vertex : vertices) {
+			for (AccessExecutionVertex vertex : vertices) {
 				ExecutionState state = vertex.getExecutionState();
 				countsPerStatus[state.ordinal()]++;
 				lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state));

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cca0124..8f3b82a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1685,12 +1685,9 @@ class JobManager(
           }(context.dispatcher))
 
           try {
-            eg.prepareForArchiving()
-
-            archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
+            archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive()))
           } catch {
-            case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
-              "archiving.", t)
+            case t: Throwable => log.error(s"Could not archive the execution graph $eg.", t)
           }
 
           futureOption

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 2d55b26..7f8fcd3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils
 import org.apache.flink.runtime.{FlinkActor, LogMessages}
 import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph}
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
@@ -66,7 +66,7 @@ class MemoryArchivist(private val max_entries: Int)
    * Map of execution graphs belonging to recently started jobs with the time stamp of the last
    * received job event. The insert order is preserved through a LinkedHashMap.
    */
-  protected val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
+  protected val graphs = mutable.LinkedHashMap[JobID, ArchivedExecutionGraph]()
 
   /* Counters for finished, canceled, and failed jobs */
   private[this] var finishedCnt: Int = 0

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index c4e3f3e..435b736 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.runtime.messages
 
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph}
 
 /**
  * This object contains the archive specific messages.
  */
 object ArchiveMessages {
   
-  case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph)
+  case class ArchiveExecutionGraph(jobID: JobID, graph: ArchivedExecutionGraph)
 
   /**
    * Request the currently archived jobs in the archiver. The resulting response is [[ArchivedJobs]]
@@ -44,19 +44,19 @@ object ArchiveMessages {
    */
   case class RequestArchivedJob(jobID: JobID)
 
-  case class ArchivedJob(job: Option[ExecutionGraph])
+  case class ArchivedJob(job: Option[ArchivedExecutionGraph])
 
   /**
    * Response to [[RequestArchivedJobs]] message. The response contains the archived jobs.
    * @param jobs
    */
-  case class ArchivedJobs(jobs: Iterable[ExecutionGraph]){
-    def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
+  case class ArchivedJobs(jobs: Iterable[ArchivedExecutionGraph]){
+    def asJavaIterable: java.lang.Iterable[ArchivedExecutionGraph] = {
       import scala.collection.JavaConverters._
       jobs.asJava
     }
 
-    def asJavaCollection: java.util.Collection[ExecutionGraph] = {
+    def asJavaCollection: java.util.Collection[ArchivedExecutionGraph] = {
       import scala.collection.JavaConverters._
       jobs.asJavaCollection
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 4cf6a02..3df8c26 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.blob.BlobKey
 import org.apache.flink.runtime.client.{JobStatusMessage, SerializedJobExecutionResult}
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{Instance, InstanceID}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
@@ -371,7 +371,7 @@ object JobManagerMessages {
    * @param jobID
    * @param executionGraph
    */
-  case class JobFound(jobID: JobID, executionGraph: ExecutionGraph) extends JobResponse
+  case class JobFound(jobID: JobID, executionGraph: AccessExecutionGraph) extends JobResponse
 
   /**
    * Denotes that there is no job with [[jobID]] retrievable. This message can be the response of

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ea4d322..0b2f4f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -81,7 +81,7 @@ public class CoordinatorShutdownTest {
 					new JobManagerMessages.RequestJob(testGraph.getJobID()),
 					timeout);
 			
-			ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
+			ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
 			
 			assertNotNull(graph);
 			graph.waitUntilFinished();
@@ -133,7 +133,7 @@ public class CoordinatorShutdownTest {
 					new JobManagerMessages.RequestJob(testGraph.getJobID()),
 					timeout);
 
-			ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
+			ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
 
 			assertNotNull(graph);
 			graph.waitUntilFinished();