You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/14 11:55:47 UTC
[1/3] flink git commit: [FLINK-4720] Implement archived ExecutionGraph
Repository: flink
Updated Branches:
refs/heads/master f6d866817 -> 21e8e2dcf
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
new file mode 100644
index 0000000..d0566d9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ArchivedExecutionGraphTest {
+ private static JobVertexID v1ID = new JobVertexID();
+ private static JobVertexID v2ID = new JobVertexID();
+
+ private static ExecutionAttemptID executionWithAccumulatorsID;
+
+ private static ExecutionGraph runtimeGraph;
+
+ @BeforeClass
+ public static void setupExecutionGraph() throws Exception {
+ // -------------------------------------------------------------------------------------------------------------
+ // Setup
+ // -------------------------------------------------------------------------------------------------------------
+
+ v1ID = new JobVertexID();
+ v2ID = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", v1ID);
+ JobVertex v2 = new JobVertex("v2", v2ID);
+
+ v1.setParallelism(1);
+ v2.setParallelism(2);
+
+ List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
+
+ ExecutionConfig config = new ExecutionConfig();
+
+ config.setExecutionMode(ExecutionMode.BATCH_FORCED);
+ config.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
+ config.setParallelism(4);
+ config.enableObjectReuse();
+ config.setGlobalJobParameters(new TestJobParameters());
+
+ runtimeGraph = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ new JobID(),
+ "test job",
+ new Configuration(),
+ new SerializedValue<>(config),
+ AkkaUtils.getDefaultTimeout(),
+ new NoRestartStrategy());
+ runtimeGraph.attachJobGraph(vertices);
+
+ runtimeGraph.enableSnapshotCheckpointing(
+ 100,
+ 100,
+ 100,
+ 1,
+ Collections.<ExecutionJobVertex>emptyList(),
+ Collections.<ExecutionJobVertex>emptyList(),
+ Collections.<ExecutionJobVertex>emptyList(),
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1, null),
+ new HeapSavepointStore(),
+ new TestCheckpointStatsTracker());
+
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = new HashMap<>();
+ flinkAccumulators.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, new LongCounter(32));
+
+ Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
+ userAccumulators.put("userAcc", new LongCounter(64));
+
+ Execution executionWithAccumulators = runtimeGraph.getJobVertex(v1ID).getTaskVertices()[0].getCurrentExecutionAttempt();
+ executionWithAccumulators.setAccumulators(flinkAccumulators, userAccumulators);
+ executionWithAccumulatorsID = executionWithAccumulators.getAttemptId();
+
+ runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new RuntimeException("This exception was thrown on purpose."));
+ }
+
+ @Test
+ public void testArchive() throws IOException, ClassNotFoundException {
+ ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+
+ compareExecutionGraph(runtimeGraph, archivedGraph);
+ }
+
+ @Test
+ public void testSerialization() throws IOException, ClassNotFoundException {
+ ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+
+ verifySerializability(archivedGraph);
+ }
+
+ private static void compareExecutionGraph(AccessExecutionGraph runtimeGraph, AccessExecutionGraph archivedGraph) throws IOException, ClassNotFoundException {
+ assertTrue(archivedGraph.isArchived());
+ // -------------------------------------------------------------------------------------------------------------
+ // ExecutionGraph
+ // -------------------------------------------------------------------------------------------------------------
+ assertEquals(runtimeGraph.getJsonPlan(), archivedGraph.getJsonPlan());
+ assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
+ assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
+ assertEquals(runtimeGraph.getState(), archivedGraph.getState());
+ assertEquals(runtimeGraph.getFailureCauseAsString(), archivedGraph.getFailureCauseAsString());
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), archivedGraph.getStatusTimestamp(JobStatus.CREATED));
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), archivedGraph.getStatusTimestamp(JobStatus.FAILING));
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILED), archivedGraph.getStatusTimestamp(JobStatus.FAILED));
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELLING), archivedGraph.getStatusTimestamp(JobStatus.CANCELLING));
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
+ assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
+ assertEquals(runtimeGraph.isStoppable(), archivedGraph.isStoppable());
+
+ // -------------------------------------------------------------------------------------------------------------
+ // JobCheckpointStats
+ // -------------------------------------------------------------------------------------------------------------
+ JobCheckpointStats runtimeStats = runtimeGraph.getCheckpointStatsTracker().getJobStats().get();
+ JobCheckpointStats archivedStats = archivedGraph.getCheckpointStatsTracker().getJobStats().get();
+
+ assertEquals(runtimeStats.getAverageDuration(), archivedStats.getAverageDuration());
+ assertEquals(runtimeStats.getMinDuration(), archivedStats.getMinDuration());
+ assertEquals(runtimeStats.getMaxDuration(), archivedStats.getMaxDuration());
+ assertEquals(runtimeStats.getAverageStateSize(), archivedStats.getAverageStateSize());
+ assertEquals(runtimeStats.getMinStateSize(), archivedStats.getMinStateSize());
+ assertEquals(runtimeStats.getMaxStateSize(), archivedStats.getMaxStateSize());
+ assertEquals(runtimeStats.getCount(), archivedStats.getCount());
+ assertEquals(runtimeStats.getRecentHistory(), archivedStats.getRecentHistory());
+
+ // -------------------------------------------------------------------------------------------------------------
+ // ArchivedExecutionConfig
+ // -------------------------------------------------------------------------------------------------------------
+ ArchivedExecutionConfig runtimeConfig = runtimeGraph.getArchivedExecutionConfig();
+ ArchivedExecutionConfig archivedConfig = archivedGraph.getArchivedExecutionConfig();
+
+ assertEquals(runtimeConfig.getExecutionMode(), archivedConfig.getExecutionMode());
+ assertEquals(runtimeConfig.getParallelism(), archivedConfig.getParallelism());
+ assertEquals(runtimeConfig.getObjectReuseEnabled(), archivedConfig.getObjectReuseEnabled());
+ assertEquals(runtimeConfig.getRestartStrategyDescription(), archivedConfig.getRestartStrategyDescription());
+ assertNotNull(archivedConfig.getGlobalJobParameters().get("hello"));
+ assertEquals(runtimeConfig.getGlobalJobParameters().get("hello"), archivedConfig.getGlobalJobParameters().get("hello"));
+
+ // -------------------------------------------------------------------------------------------------------------
+ // StringifiedAccumulators
+ // -------------------------------------------------------------------------------------------------------------
+ compareStringifiedAccumulators(runtimeGraph.getAccumulatorResultsStringified(), archivedGraph.getAccumulatorResultsStringified());
+ compareSerializedAccumulators(runtimeGraph.getAccumulatorsSerialized(), archivedGraph.getAccumulatorsSerialized());
+ compareFlinkAccumulators(runtimeGraph.getFlinkAccumulators().get(executionWithAccumulatorsID), archivedGraph.getFlinkAccumulators().get(executionWithAccumulatorsID));
+
+ // -------------------------------------------------------------------------------------------------------------
+ // JobVertices
+ // -------------------------------------------------------------------------------------------------------------
+ Map<JobVertexID, ? extends AccessExecutionJobVertex> runtimeVertices = runtimeGraph.getAllVertices();
+ Map<JobVertexID, ? extends AccessExecutionJobVertex> archivedVertices = archivedGraph.getAllVertices();
+
+ for (Map.Entry<JobVertexID, ? extends AccessExecutionJobVertex> vertex : runtimeVertices.entrySet()) {
+ compareExecutionJobVertex(vertex.getValue(), archivedVertices.get(vertex.getKey()));
+ }
+
+ Iterator<? extends AccessExecutionJobVertex> runtimeTopologicalVertices = runtimeGraph.getVerticesTopologically().iterator();
+ Iterator<? extends AccessExecutionJobVertex> archiveTopologicaldVertices = archivedGraph.getVerticesTopologically().iterator();
+
+ while (runtimeTopologicalVertices.hasNext()) {
+ assertTrue(archiveTopologicaldVertices.hasNext());
+ compareExecutionJobVertex(runtimeTopologicalVertices.next(), archiveTopologicaldVertices.next());
+ }
+
+ // -------------------------------------------------------------------------------------------------------------
+ // OperatorCheckpointStats
+ // -------------------------------------------------------------------------------------------------------------
+ CheckpointStatsTracker runtimeTracker = runtimeGraph.getCheckpointStatsTracker();
+ CheckpointStatsTracker archivedTracker = archivedGraph.getCheckpointStatsTracker();
+ compareOperatorCheckpointStats(runtimeTracker.getOperatorStats(v1ID).get(), archivedTracker.getOperatorStats(v1ID).get());
+ compareOperatorCheckpointStats(runtimeTracker.getOperatorStats(v2ID).get(), archivedTracker.getOperatorStats(v2ID).get());
+
+ // -------------------------------------------------------------------------------------------------------------
+ // ExecutionVertices
+ // -------------------------------------------------------------------------------------------------------------
+ Iterator<? extends AccessExecutionVertex> runtimeExecutionVertices = runtimeGraph.getAllExecutionVertices().iterator();
+ Iterator<? extends AccessExecutionVertex> archivedExecutionVertices = archivedGraph.getAllExecutionVertices().iterator();
+
+ while (runtimeExecutionVertices.hasNext()) {
+ assertTrue(archivedExecutionVertices.hasNext());
+ compareExecutionVertex(runtimeExecutionVertices.next(), archivedExecutionVertices.next());
+ }
+ }
+
+ private static void compareExecutionJobVertex(AccessExecutionJobVertex runtimeJobVertex, AccessExecutionJobVertex archivedJobVertex) {
+ assertEquals(runtimeJobVertex.getName(), archivedJobVertex.getName());
+ assertEquals(runtimeJobVertex.getParallelism(), archivedJobVertex.getParallelism());
+ assertEquals(runtimeJobVertex.getMaxParallelism(), archivedJobVertex.getMaxParallelism());
+ assertEquals(runtimeJobVertex.getJobVertexId(), archivedJobVertex.getJobVertexId());
+ assertEquals(runtimeJobVertex.getAggregateState(), archivedJobVertex.getAggregateState());
+
+ compareOperatorCheckpointStats(runtimeJobVertex.getCheckpointStats().get(), archivedJobVertex.getCheckpointStats().get());
+
+ compareStringifiedAccumulators(runtimeJobVertex.getAggregatedUserAccumulatorsStringified(), archivedJobVertex.getAggregatedUserAccumulatorsStringified());
+ compareFlinkAccumulators(runtimeJobVertex.getAggregatedMetricAccumulators(), archivedJobVertex.getAggregatedMetricAccumulators());
+
+ AccessExecutionVertex[] runtimeExecutionVertices = runtimeJobVertex.getTaskVertices();
+ AccessExecutionVertex[] archivedExecutionVertices = archivedJobVertex.getTaskVertices();
+ assertEquals(runtimeExecutionVertices.length, archivedExecutionVertices.length);
+ for (int x = 0; x < runtimeExecutionVertices.length; x++) {
+ compareExecutionVertex(runtimeExecutionVertices[x], archivedExecutionVertices[x]);
+ }
+ }
+
+ private static void compareExecutionVertex(AccessExecutionVertex runtimeVertex, AccessExecutionVertex archivedVertex) {
+ assertEquals(runtimeVertex.getTaskNameWithSubtaskIndex(), archivedVertex.getTaskNameWithSubtaskIndex());
+ assertEquals(runtimeVertex.getParallelSubtaskIndex(), archivedVertex.getParallelSubtaskIndex());
+ assertEquals(runtimeVertex.getExecutionState(), archivedVertex.getExecutionState());
+ assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CREATED), archivedVertex.getStateTimestamp(ExecutionState.CREATED));
+ assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.SCHEDULED), archivedVertex.getStateTimestamp(ExecutionState.SCHEDULED));
+ assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.DEPLOYING), archivedVertex.getStateTimestamp(ExecutionState.DEPLOYING));
+ assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.RUNNING), archivedVertex.getStateTimestamp(ExecutionState.RUNNING));
+ assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.FINISHED), archivedVertex.getStateTimestamp(ExecutionState.FINISHED));
+ assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CANCELING), archivedVertex.getStateTimestamp(ExecutionState.CANCELING));
+ assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CANCELED), archivedVertex.getStateTimestamp(ExecutionState.CANCELED));
+ assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.FAILED), archivedVertex.getStateTimestamp(ExecutionState.FAILED));
+ assertEquals(runtimeVertex.getFailureCauseAsString(), archivedVertex.getFailureCauseAsString());
+ assertEquals(runtimeVertex.getCurrentAssignedResourceLocation(), archivedVertex.getCurrentAssignedResourceLocation());
+
+ compareExecution(runtimeVertex.getCurrentExecutionAttempt(), archivedVertex.getCurrentExecutionAttempt());
+ }
+
+ private static void compareExecution(AccessExecution runtimeExecution, AccessExecution archivedExecution) {
+ assertEquals(runtimeExecution.getAttemptId(), archivedExecution.getAttemptId());
+ assertEquals(runtimeExecution.getAttemptNumber(), archivedExecution.getAttemptNumber());
+ assertArrayEquals(runtimeExecution.getStateTimestamps(), archivedExecution.getStateTimestamps());
+ assertEquals(runtimeExecution.getState(), archivedExecution.getState());
+ assertEquals(runtimeExecution.getAssignedResourceLocation(), archivedExecution.getAssignedResourceLocation());
+ assertEquals(runtimeExecution.getFailureCauseAsString(), archivedExecution.getFailureCauseAsString());
+ assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CREATED), archivedExecution.getStateTimestamp(ExecutionState.CREATED));
+ assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.SCHEDULED), archivedExecution.getStateTimestamp(ExecutionState.SCHEDULED));
+ assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.DEPLOYING), archivedExecution.getStateTimestamp(ExecutionState.DEPLOYING));
+ assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.RUNNING), archivedExecution.getStateTimestamp(ExecutionState.RUNNING));
+ assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.FINISHED), archivedExecution.getStateTimestamp(ExecutionState.FINISHED));
+ assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CANCELING), archivedExecution.getStateTimestamp(ExecutionState.CANCELING));
+ assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CANCELED), archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
+ assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.FAILED), archivedExecution.getStateTimestamp(ExecutionState.FAILED));
+ compareStringifiedAccumulators(runtimeExecution.getUserAccumulatorsStringified(), archivedExecution.getUserAccumulatorsStringified());
+ compareFlinkAccumulators(runtimeExecution.getFlinkAccumulators(), archivedExecution.getFlinkAccumulators());
+ assertEquals(runtimeExecution.getParallelSubtaskIndex(), archivedExecution.getParallelSubtaskIndex());
+ }
+
+ private static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] runtimeAccs, StringifiedAccumulatorResult[] archivedAccs) {
+ assertEquals(runtimeAccs.length, archivedAccs.length);
+
+ for (int x = 0; x < runtimeAccs.length; x++) {
+ StringifiedAccumulatorResult runtimeResult = runtimeAccs[x];
+ StringifiedAccumulatorResult archivedResult = archivedAccs[x];
+
+ assertEquals(runtimeResult.getName(), archivedResult.getName());
+ assertEquals(runtimeResult.getType(), archivedResult.getType());
+ assertEquals(runtimeResult.getValue(), archivedResult.getValue());
+ }
+ }
+
+ private static void compareSerializedAccumulators(Map<String, SerializedValue<Object>> runtimeAccs, Map<String, SerializedValue<Object>> archivedAccs) throws IOException, ClassNotFoundException {
+ assertEquals(runtimeAccs.size(), archivedAccs.size());
+ for (Map.Entry<String, SerializedValue<Object>> runtimeAcc : runtimeAccs.entrySet()) {
+ long runtimeUserAcc = (long) runtimeAcc.getValue().deserializeValue(ClassLoader.getSystemClassLoader());
+ long archivedUserAcc = (long) archivedAccs.get(runtimeAcc.getKey()).deserializeValue(ClassLoader.getSystemClassLoader());
+
+ assertEquals(runtimeUserAcc, archivedUserAcc);
+ }
+ }
+
+ private static void compareFlinkAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> runtimeAccs, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> archivedAccs) {
+ assertEquals(runtimeAccs == null, archivedAccs == null);
+ if (runtimeAccs != null && archivedAccs != null) {
+ assertEquals(runtimeAccs.size(), archivedAccs.size());
+ for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> runtimeAcc : runtimeAccs.entrySet()) {
+ Accumulator<?, ?> archivedAcc = archivedAccs.get(runtimeAcc.getKey());
+
+ assertEquals(runtimeAcc.getValue().getLocalValue(), archivedAcc.getLocalValue());
+ }
+ }
+ }
+
+ private static void compareOperatorCheckpointStats(OperatorCheckpointStats runtimeStats, OperatorCheckpointStats archivedStats) {
+ assertEquals(runtimeStats.getNumberOfSubTasks(), archivedStats.getNumberOfSubTasks());
+ assertEquals(runtimeStats.getCheckpointId(), archivedStats.getCheckpointId());
+ assertEquals(runtimeStats.getDuration(), archivedStats.getDuration());
+ assertEquals(runtimeStats.getStateSize(), archivedStats.getStateSize());
+ assertEquals(runtimeStats.getTriggerTimestamp(), archivedStats.getTriggerTimestamp());
+ assertEquals(runtimeStats.getSubTaskDuration(0), archivedStats.getSubTaskDuration(0));
+ assertEquals(runtimeStats.getSubTaskStateSize(0), archivedStats.getSubTaskStateSize(0));
+ }
+
+ private static void verifySerializability(ArchivedExecutionGraph graph) throws IOException, ClassNotFoundException {
+ ArchivedExecutionGraph copy = CommonTestUtils.createCopySerializable(graph);
+ compareExecutionGraph(graph, copy);
+ }
+
+
+ private static class TestCheckpointStatsTracker implements CheckpointStatsTracker {
+
+ @Override
+ public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+ }
+
+ @Override
+ public Option<JobCheckpointStats> getJobStats() {
+ return Option.<JobCheckpointStats>apply(new TestJobCheckpointStats());
+ }
+
+ @Override
+ public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId) {
+ return Option.<OperatorCheckpointStats>apply(new TestOperatorCheckpointStats(operatorId.getUpperPart()));
+ }
+ }
+
+ private static class TestJobCheckpointStats implements JobCheckpointStats {
+ private static final long serialVersionUID = -2630234917947292836L;
+
+ @Override
+ public List<CheckpointStats> getRecentHistory() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+
+ @Override
+ public long getMinDuration() {
+ return 2;
+ }
+
+ @Override
+ public long getMaxDuration() {
+ return 4;
+ }
+
+ @Override
+ public long getAverageDuration() {
+ return 3;
+ }
+
+ @Override
+ public long getMinStateSize() {
+ return 5;
+ }
+
+ @Override
+ public long getMaxStateSize() {
+ return 7;
+ }
+
+ @Override
+ public long getAverageStateSize() {
+ return 6;
+ }
+ }
+
+ private static class TestOperatorCheckpointStats extends OperatorCheckpointStats {
+ private static final long serialVersionUID = -2798640928349528644L;
+
+ public TestOperatorCheckpointStats(long offset) {
+ super(1 + offset, 2 + offset, 3 + offset, 4 + offset, new long[][]{new long[]{5 + offset, 6 + offset}});
+ }
+ }
+
+ private static class TestJobParameters extends ExecutionConfig.GlobalJobParameters {
+ private static final long serialVersionUID = -8118611781035212808L;
+ private Map<String, String> parameters;
+
+ private TestJobParameters() {
+ this.parameters = new HashMap<>();
+ this.parameters.put("hello", "world");
+ }
+
+ @Override
+ public Map<String, String> toMap() {
+ return parameters;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 183477a..3c6ae9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -201,7 +201,7 @@ public class JobManagerTest {
// Request the execution graph to get the runtime info
jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway);
- final ExecutionGraph eg = expectMsgClass(ExecutionGraphFound.class)
+ final ExecutionGraph eg = (ExecutionGraph) expectMsgClass(ExecutionGraphFound.class)
.executionGraph();
final ExecutionVertex vertex = eg.getJobVertex(sender.getID())
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 450f9fb..8a9a4ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -111,7 +111,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
assertTrue(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound);
- ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
+ ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
TestJobStatusListener testListener = new TestJobStatusListener();
executionGraph.registerJobStatusListener(testListener);
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index d07c48f..72cf58b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.accumulators.Accumulator
import org.apache.flink.runtime.accumulators.AccumulatorRegistry
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.instance.ActorGateway
import org.apache.flink.runtime.jobgraph.JobStatus
@@ -37,7 +37,7 @@ object TestingJobManagerMessages {
def jobID: JobID
}
- case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends
+ case class ExecutionGraphFound(jobID: JobID, executionGraph: AccessExecutionGraph) extends
ResponseExecutionGraph
case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 1259460..e0e6ecd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -40,7 +40,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -396,7 +395,7 @@ public class QueryableStateITCase extends TestLogger {
.map(new Mapper<TestingJobManagerMessages.ExecutionGraphFound, ExecutionGraph>() {
@Override
public ExecutionGraph apply(ExecutionGraphFound found) {
- return found.executionGraph();
+ return (ExecutionGraph) found.executionGraph();
}
}, TEST_ACTOR_SYSTEM.dispatcher());
ExecutionGraph eg = Await.result(egFuture, deadline.timeLeft());
@@ -553,12 +552,13 @@ public class QueryableStateITCase extends TestLogger {
.mapTo(ClassTag$.MODULE$.<JobFound>apply(JobFound.class)),
deadline.timeLeft());
- Throwable failureCause = jobFound.executionGraph().getFailureCause();
+ String failureCause = jobFound.executionGraph().getFailureCauseAsString();
- assertTrue("Not instance of SuppressRestartsException", failureCause instanceof SuppressRestartsException);
- assertTrue("Not caused by IllegalStateException", failureCause.getCause() instanceof IllegalStateException);
- Throwable duplicateException = failureCause.getCause();
- assertTrue("Exception does not contain registration name", duplicateException.getMessage().contains(queryName));
+ assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+ int causedByIndex = failureCause.indexOf("Caused by: ");
+ String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
+ assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
+ assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
} finally {
// Free cluster resources
if (jobId != null) {
[3/3] flink git commit: [FLINK-4720] Implement archived ExecutionGraph
Posted by ch...@apache.org.
[FLINK-4720] Implement archived ExecutionGraph
This closes #2577.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21e8e2dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21e8e2dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21e8e2dc
Branch: refs/heads/master
Commit: 21e8e2dcf77f9d0dc3a74204626b776d87a9cd15
Parents: f6d8668
Author: zentol <ch...@apache.org>
Authored: Thu Sep 22 14:02:22 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 14 13:55:28 2016 +0200
----------------------------------------------------------------------
.../apache/flink/api/common/Archiveable.java | 24 +
.../api/common/ArchivedExecutionConfig.java | 73 ++++
.../flink/api/common/ExecutionConfig.java | 7 +-
.../webmonitor/ExecutionGraphHolder.java | 9 +-
.../AbstractExecutionGraphRequestHandler.java | 6 +-
.../AbstractJobVertexRequestHandler.java | 10 +-
.../AbstractSubtaskAttemptRequestHandler.java | 12 +-
.../handlers/AbstractSubtaskRequestHandler.java | 10 +-
.../handlers/JobAccumulatorsHandler.java | 4 +-
.../handlers/JobCheckpointsHandler.java | 4 +-
.../webmonitor/handlers/JobConfigHandler.java | 10 +-
.../webmonitor/handlers/JobDetailsHandler.java | 13 +-
.../handlers/JobExceptionsHandler.java | 19 +-
.../webmonitor/handlers/JobPlanHandler.java | 4 +-
.../handlers/JobVertexAccumulatorsHandler.java | 4 +-
.../handlers/JobVertexBackPressureHandler.java | 9 +-
.../handlers/JobVertexCheckpointsHandler.java | 42 +-
.../handlers/JobVertexDetailsHandler.java | 10 +-
.../handlers/JobVertexTaskManagersHandler.java | 21 +-
.../SubtaskCurrentAttemptDetailsHandler.java | 4 +-
...taskExecutionAttemptAccumulatorsHandler.java | 6 +-
.../SubtaskExecutionAttemptDetailsHandler.java | 6 +-
.../SubtasksAllAccumulatorsHandler.java | 8 +-
.../handlers/SubtasksTimesHandler.java | 10 +-
.../BackPressureStatsTrackerITCase.java | 3 +-
.../StackTraceSampleCoordinatorITCase.java | 3 +-
.../JobVertexCheckpointsHandlerTest.java | 22 +-
.../ArchivedCheckpointStatsTracker.java | 53 +++
.../checkpoint/stats/CheckpointStats.java | 4 +-
.../checkpoint/stats/JobCheckpointStats.java | 3 +-
.../stats/OperatorCheckpointStats.java | 2 +
.../stats/SimpleCheckpointStatsTracker.java | 2 +
.../runtime/executiongraph/AccessExecution.java | 105 +++++
.../executiongraph/AccessExecutionGraph.java | 161 +++++++
.../AccessExecutionJobVertex.java | 98 +++++
.../executiongraph/AccessExecutionVertex.java | 85 ++++
.../executiongraph/ArchivedExecution.java | 118 +++++
.../executiongraph/ArchivedExecutionGraph.java | 297 +++++++++++++
.../ArchivedExecutionJobVertex.java | 136 ++++++
.../executiongraph/ArchivedExecutionVertex.java | 96 ++++
.../flink/runtime/executiongraph/Execution.java | 41 +-
.../runtime/executiongraph/ExecutionGraph.java | 154 ++++---
.../executiongraph/ExecutionJobVertex.java | 61 ++-
.../runtime/executiongraph/ExecutionVertex.java | 50 +--
.../archive/ExecutionConfigSummary.java | 75 ----
.../runtime/webmonitor/WebMonitorUtils.java | 14 +-
.../flink/runtime/jobmanager/JobManager.scala | 7 +-
.../runtime/jobmanager/MemoryArchivist.scala | 4 +-
.../runtime/messages/ArchiveMessages.scala | 12 +-
.../runtime/messages/JobManagerMessages.scala | 4 +-
.../checkpoint/CoordinatorShutdownTest.java | 4 +-
.../ArchivedExecutionGraphTest.java | 434 +++++++++++++++++++
.../runtime/jobmanager/JobManagerTest.java | 2 +-
.../LeaderChangeJobRecoveryTest.java | 2 +-
.../TestingJobManagerMessages.scala | 4 +-
.../flink/test/query/QueryableStateITCase.java | 14 +-
56 files changed, 2014 insertions(+), 381 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
new file mode 100644
index 0000000..09a3a0c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.io.Serializable;
+
+public interface Archiveable<T extends Serializable> {
+ T archive();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
new file mode 100644
index 0000000..faf920d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Serializable class which is created when archiving the job.
+ * It can be used to display job information on the web interface
+ * without having to keep the classloader around after job completion.
+ */
+public class ArchivedExecutionConfig implements Serializable {
+
+ private final String executionMode;
+ private final String restartStrategyDescription;
+ private final int parallelism;
+ private final boolean objectReuseEnabled;
+ private final Map<String, String> globalJobParameters;
+
+ public ArchivedExecutionConfig(ExecutionConfig ec) {
+ executionMode = ec.getExecutionMode().name();
+ if (ec.getRestartStrategy() != null) {
+ restartStrategyDescription = ec.getRestartStrategy().getDescription();
+ } else {
+ restartStrategyDescription = "default";
+ }
+ parallelism = ec.getParallelism();
+ objectReuseEnabled = ec.isObjectReuseEnabled();
+ if (ec.getGlobalJobParameters() != null
+ && ec.getGlobalJobParameters().toMap() != null) {
+ globalJobParameters = ec.getGlobalJobParameters().toMap();
+ } else {
+ globalJobParameters = Collections.emptyMap();
+ }
+ }
+
+ public String getExecutionMode() {
+ return executionMode;
+ }
+
+ public String getRestartStrategyDescription() {
+ return restartStrategyDescription;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public boolean getObjectReuseEnabled() {
+ return objectReuseEnabled;
+ }
+
+ public Map<String, String> getGlobalJobParameters() {
+ return globalJobParameters;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index aadf867..a0a63b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -58,7 +58,7 @@ import java.util.Objects;
* </ul>
*/
@Public
-public class ExecutionConfig implements Serializable {
+public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecutionConfig> {
private static final long serialVersionUID = 1L;
@@ -770,6 +770,11 @@ public class ExecutionConfig implements Serializable {
public boolean canEqual(Object obj) {
return obj instanceof ExecutionConfig;
}
+
+ @Override
+ public ArchivedExecutionConfig archive() {
+ return new ArchivedExecutionConfig(this);
+ }
// ------------------------------ Utilities ----------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 7691874..3d0cfc0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -47,7 +48,7 @@ public class ExecutionGraphHolder {
private final FiniteDuration timeout;
- private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
+ private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
public ExecutionGraphHolder() {
this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
@@ -63,8 +64,8 @@ public class ExecutionGraphHolder {
* @param jid jobID of the execution graph to be retrieved
* @return the retrieved execution graph or null if it is not retrievable
*/
- public ExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
- ExecutionGraph cached = cache.get(jid);
+ public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
+ AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
return cached;
}
@@ -78,7 +79,7 @@ public class ExecutionGraphHolder {
return null;
}
else if (result instanceof JobManagerMessages.JobFound) {
- ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
+ AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
cache.put(jid, eg);
return eg;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 16cfb1a..ff28d4e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.NotFoundException;
@@ -53,7 +53,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
}
- ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
+ AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
if (eg == null) {
throw new NotFoundException("Could not find job with id " + jid);
}
@@ -61,5 +61,5 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
return handleRequest(eg, pathParams);
}
- public abstract String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception;
+ public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
index 5b12907..a36f94a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -36,7 +36,7 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
}
@Override
- public final String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
final String vidString = params.get("vertexid");
if (vidString == null) {
throw new IllegalArgumentException("vertexId parameter missing");
@@ -50,7 +50,7 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage());
}
- final ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+ final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
if (jobVertex == null) {
throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists.");
}
@@ -58,5 +58,5 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
return handleRequest(jobVertex, params);
}
- public abstract String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
+ public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 672df16..f3a5059 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.util.Map;
@@ -37,7 +37,7 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
}
@Override
- public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
final String attemptNumberString = params.get("attempt");
if (attemptNumberString == null) {
throw new RuntimeException("Attempt number parameter missing");
@@ -51,12 +51,12 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
throw new RuntimeException("Invalid attempt number parameter");
}
- final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
+ final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
if (attempt == currentAttempt.getAttemptNumber()) {
return handleRequest(currentAttempt, params);
}
else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
- Execution exec = vertex.getPriorExecutionAttempt(attempt);
+ AccessExecution exec = vertex.getPriorExecutionAttempt(attempt);
return handleRequest(exec, params);
}
else {
@@ -64,5 +64,5 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
}
}
- public abstract String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception;
+ public abstract String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
index 90866c6..d6b279c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.util.Map;
@@ -36,7 +36,7 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq
}
@Override
- public final String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ public final String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
final String subtaskNumberString = params.get("subtasknum");
if (subtaskNumberString == null) {
throw new RuntimeException("Subtask number parameter missing");
@@ -54,9 +54,9 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq
throw new RuntimeException("subtask does not exist: " + subtask);
}
- final ExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
+ final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
return handleRequest(vertex, params);
}
- public abstract String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception;
+ public abstract String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index c5418b3..29613a0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
@@ -36,7 +36,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
StringWriter writer = new StringWriter();
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
index b63ab0e..404a14e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import scala.Option;
@@ -39,7 +39,7 @@ public class JobCheckpointsHandler extends AbstractExecutionGraphRequestHandler
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 75389b1..21639ef 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -22,8 +22,8 @@ import java.io.StringWriter;
import java.util.Map;
import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
/**
@@ -36,7 +36,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
@@ -45,7 +45,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
gen.writeStringField("jid", graph.getJobID().toString());
gen.writeStringField("name", graph.getJobName());
- final ExecutionConfigSummary summary = graph.getExecutionConfigSummary();
+ final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig();
if (summary != null) {
gen.writeObjectFieldStart("execution-config");
@@ -59,7 +59,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
Map<String, String> ucVals = summary.getGlobalJobParameters();
if (ucVals != null) {
gen.writeObjectFieldStart("user-config");
-
+
for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
gen.writeStringField(ucVal.getKey(), ucVal.getValue());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 884b859..e7a2a8c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -50,7 +51,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
final StringWriter writer = new StringWriter();
final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
@@ -84,13 +85,13 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
int[] jobVerticesPerState = new int[ExecutionState.values().length];
gen.writeArrayFieldStart("vertices");
- for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
+ for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) {
int[] tasksPerState = new int[ExecutionState.values().length];
long startTime = Long.MAX_VALUE;
long endTime = 0;
boolean allFinished = true;
- for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+ for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++;
@@ -133,7 +134,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
gen.writeStartObject();
gen.writeStringField("id", ejv.getJobVertexId().toString());
- gen.writeStringField("name", ejv.getJobVertex().getName());
+ gen.writeStringField("name", ejv.getName());
gen.writeNumberField("parallelism", ejv.getParallelism());
gen.writeStringField("status", jobVertexState.name());
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index ce154e3..90197d0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -19,11 +19,10 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.ExceptionUtils;
import java.io.StringWriter;
import java.util.Map;
@@ -40,16 +39,16 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
gen.writeStartObject();
// most important is the root failure cause
- Throwable rootException = graph.getFailureCause();
+ String rootException = graph.getFailureCauseAsString();
if (rootException != null) {
- gen.writeStringField("root-exception", ExceptionUtils.stringifyException(rootException));
+ gen.writeStringField("root-exception", rootException);
}
// we additionally collect all exceptions (up to a limit) that occurred in the individual tasks
@@ -58,8 +57,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
int numExceptionsSoFar = 0;
boolean truncated = false;
- for (ExecutionVertex task : graph.getAllExecutionVertices()) {
- Throwable t = task.getFailureCause();
+ for (AccessExecutionVertex task : graph.getAllExecutionVertices()) {
+ String t = task.getFailureCauseAsString();
if (t != null) {
if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
truncated = true;
@@ -71,8 +70,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";
gen.writeStartObject();
- gen.writeStringField("exception", ExceptionUtils.stringifyException(t));
- gen.writeStringField("task", task.getSimpleName());
+ gen.writeStringField("exception", t);
+ gen.writeStringField("task", task.getTaskNameWithSubtaskIndex());
gen.writeStringField("location", locationString);
gen.writeEndObject();
numExceptionsSoFar++;
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index 0389f5a..64f7000 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.util.Map;
@@ -34,7 +34,7 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
return graph.getJsonPlan();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 5df565a..ad4e207 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
@@ -35,7 +35,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
}
@Override
- public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
StringWriter writer = new StringWriter();
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index 65f82a3..c5bacf2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -56,9 +58,12 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
@Override
public String handleRequest(
- ExecutionJobVertex jobVertex,
+ AccessExecutionJobVertex accessJobVertex,
Map<String, String> params) throws Exception {
-
+ if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
+ return "";
+ }
+ ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex;
try (StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
index 6522de5..8a68ffa 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
@@ -19,9 +19,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import scala.Option;
@@ -38,36 +37,31 @@ public class JobVertexCheckpointsHandler extends AbstractJobVertexRequestHandler
}
@Override
- public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
gen.writeStartObject();
- CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker();
+ Option<OperatorCheckpointStats> statsOption = jobVertex.getCheckpointStats();
- if (tracker != null) {
- Option<OperatorCheckpointStats> statsOption = tracker
- .getOperatorStats(jobVertex.getJobVertexId());
+ if (statsOption.isDefined()) {
+ OperatorCheckpointStats stats = statsOption.get();
- if (statsOption.isDefined()) {
- OperatorCheckpointStats stats = statsOption.get();
+ gen.writeNumberField("id", stats.getCheckpointId());
+ gen.writeNumberField("timestamp", stats.getTriggerTimestamp());
+ gen.writeNumberField("duration", stats.getDuration());
+ gen.writeNumberField("size", stats.getStateSize());
+ gen.writeNumberField("parallelism", stats.getNumberOfSubTasks());
- gen.writeNumberField("id", stats.getCheckpointId());
- gen.writeNumberField("timestamp", stats.getTriggerTimestamp());
- gen.writeNumberField("duration", stats.getDuration());
- gen.writeNumberField("size", stats.getStateSize());
- gen.writeNumberField("parallelism", stats.getNumberOfSubTasks());
-
- gen.writeArrayFieldStart("subtasks");
- for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
- gen.writeStartObject();
- gen.writeNumberField("subtask", i);
- gen.writeNumberField("duration", stats.getSubTaskDuration(i));
- gen.writeNumberField("size", stats.getSubTaskStateSize(i));
- gen.writeEndObject();
- }
- gen.writeEndArray();
+ gen.writeArrayFieldStart("subtasks");
+ for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
+ gen.writeStartObject();
+ gen.writeNumberField("subtask", i);
+ gen.writeNumberField("duration", stats.getSubTaskDuration(i));
+ gen.writeNumberField("size", stats.getSubTaskStateSize(i));
+ gen.writeEndObject();
}
+ gen.writeEndArray();
}
gen.writeEndObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 813ecb8..fbdd86b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -43,7 +43,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
}
@Override
- public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
final long now = System.currentTimeMillis();
StringWriter writer = new StringWriter();
@@ -52,13 +52,13 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
gen.writeStartObject();
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
- gen.writeStringField("name", jobVertex.getJobVertex().getName());
+ gen.writeStringField("name", jobVertex.getName());
gen.writeNumberField("parallelism", jobVertex.getParallelism());
gen.writeNumberField("now", now);
gen.writeArrayFieldStart("subtasks");
int num = 0;
- for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
final ExecutionState status = vertex.getExecutionState();
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index cbdb87f..0e94334 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -23,8 +23,9 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -46,18 +47,18 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
}
@Override
- public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
// Build a map that groups tasks by TaskManager
- Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>();
+ Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
- for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
- List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager);
+ List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
if (vertices == null) {
- vertices = new ArrayList<ExecutionVertex>();
+ vertices = new ArrayList<>();
taskManagerVertices.put(taskManager, vertices);
}
@@ -73,13 +74,13 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
gen.writeStartObject();
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
- gen.writeStringField("name", jobVertex.getJobVertex().getName());
+ gen.writeStringField("name", jobVertex.getName());
gen.writeNumberField("now", now);
gen.writeArrayFieldStart("taskmanagers");
- for (Entry<String, List<ExecutionVertex>> entry : taskManagerVertices.entrySet()) {
+ for (Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
String host = entry.getKey();
- List<ExecutionVertex> taskVertices = entry.getValue();
+ List<AccessExecutionVertex> taskVertices = entry.getValue();
int[] tasksPerState = new int[ExecutionState.values().length];
@@ -92,7 +93,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
LongCounter tmReadRecords = new LongCounter();
LongCounter tmWriteRecords = new LongCounter();
- for (ExecutionVertex vertex : taskVertices) {
+ for (AccessExecutionVertex vertex : taskVertices) {
final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++;
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index d301bd1..811bea6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.util.Map;
@@ -33,7 +33,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
}
@Override
- public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
return handleRequest(vertex.getCurrentExecutionAttempt(), params);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 14ccc0c..786f5e8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
@@ -38,7 +38,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
}
@Override
- public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
StringWriter writer = new StringWriter();
@@ -46,7 +46,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
gen.writeStartObject();
- gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex());
+ gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex());
gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
gen.writeStringField("id", execAttempt.getAttemptId().toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index a1e6d0e..3cc7376 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -41,7 +41,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
}
@Override
- public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
final ExecutionState status = execAttempt.getState();
final long now = System.currentTimeMillis();
@@ -78,7 +78,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
gen.writeStartObject();
- gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex());
+ gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex());
gen.writeStringField("status", status.name());
gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
gen.writeStringField("host", locationString);
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 780bd4b..892a606 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -39,7 +39,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
}
@Override
- public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
@@ -50,7 +50,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
gen.writeArrayFieldStart("subtasks");
int num = 0;
- for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 9e6276d..76349ee 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -41,7 +41,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
}
@Override
- public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
final long now = System.currentTimeMillis();
StringWriter writer = new StringWriter();
@@ -50,13 +50,13 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
gen.writeStartObject();
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
- gen.writeStringField("name", jobVertex.getJobVertex().getName());
+ gen.writeStringField("name", jobVertex.getName());
gen.writeNumberField("now", now);
gen.writeArrayFieldStart("subtasks");
int num = 0;
- for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
ExecutionState status = vertex.getExecutionState();
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 25dc189..507c977 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
@@ -153,7 +152,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
ExecutionGraphFound executionGraphResponse =
expectMsgClass(ExecutionGraphFound.class);
- ExecutionGraph executionGraph = executionGraphResponse.executionGraph();
+ ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..c4ce9d1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -125,7 +124,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor);
ExecutionGraphFound executionGraphResponse =
expectMsgClass(ExecutionGraphFound.class);
- ExecutionGraph executionGraph = executionGraphResponse.executionGraph();
+ ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
index f882663..18aae35 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
@@ -20,9 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -37,8 +35,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -49,10 +45,9 @@ public class JobVertexCheckpointsHandlerTest {
JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler(
mock(ExecutionGraphHolder.class));
- ExecutionGraph graph = mock(ExecutionGraph.class);
ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-
- when(vertex.getGraph()).thenReturn(graph);
+ when(vertex.getCheckpointStats())
+ .thenReturn(Option.<OperatorCheckpointStats>empty());
String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap());
@@ -65,15 +60,10 @@ public class JobVertexCheckpointsHandlerTest {
JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler(
mock(ExecutionGraphHolder.class));
- ExecutionGraph graph = mock(ExecutionGraph.class);
ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-
- when(vertex.getGraph()).thenReturn(graph);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
// No stats
- when(tracker.getOperatorStats(any(JobVertexID.class)))
+ when(vertex.getCheckpointStats())
.thenReturn(Option.<OperatorCheckpointStats>empty());
String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap());
@@ -89,13 +79,9 @@ public class JobVertexCheckpointsHandlerTest {
JobVertexID vertexId = new JobVertexID();
- ExecutionGraph graph = mock(ExecutionGraph.class);
ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
when(vertex.getJobVertexId()).thenReturn(vertexId);
- when(vertex.getGraph()).thenReturn(graph);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
long[][] subTaskStats = new long[][] {
new long[] { 1, 10 },
@@ -113,7 +99,7 @@ public class JobVertexCheckpointsHandlerTest {
OperatorCheckpointStats stats = new OperatorCheckpointStats(
3, 6812, 2800, 1024, subTaskStats);
- when(tracker.getOperatorStats(eq(vertexId)))
+ when(vertex.getCheckpointStats())
.thenReturn(Option.apply(stats));
// Request stats
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
new file mode 100644
index 0000000..92df7d7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class ArchivedCheckpointStatsTracker implements CheckpointStatsTracker, Serializable {
+ private static final long serialVersionUID = 1469003563086353555L;
+
+ private final Option<JobCheckpointStats> jobStats;
+ private final Map<JobVertexID, OperatorCheckpointStats> operatorStats;
+
+ public ArchivedCheckpointStatsTracker(Option<JobCheckpointStats> jobStats, Map<JobVertexID, OperatorCheckpointStats> operatorStats) {
+ this.jobStats = jobStats;
+ this.operatorStats = operatorStats;
+ }
+
+ @Override
+ public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+ }
+
+ @Override
+ public Option<JobCheckpointStats> getJobStats() {
+ return jobStats;
+ }
+
+ @Override
+ public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId) {
+ return Option.apply(operatorStats.get(operatorId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
index dc239dd..64f17d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
@@ -18,10 +18,12 @@
package org.apache.flink.runtime.checkpoint.stats;
+import java.io.Serializable;
+
/**
* Statistics for a specific checkpoint.
*/
-public class CheckpointStats {
+public class CheckpointStats implements Serializable {
/** ID of the checkpoint. */
private final long checkpointId;
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
index b1d7ff2..e156c8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
@@ -20,12 +20,13 @@ package org.apache.flink.runtime.checkpoint.stats;
import org.apache.flink.configuration.ConfigConstants;
+import java.io.Serializable;
import java.util.List;
/**
* Snapshot of checkpoint statistics for a job.
*/
-public interface JobCheckpointStats {
+public interface JobCheckpointStats extends Serializable {
// ------------------------------------------------------------------------
// General stats
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
index 5b113d8..6c2d497 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
@@ -27,6 +27,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class OperatorCheckpointStats extends CheckpointStats {
+ private static final long serialVersionUID = -1594736655739376140L;
+
/** Duration in milliseconds and state sizes in bytes per sub task. */
private final long[][] subTaskStats;
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index db8a0e0..39fbad5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -348,6 +348,8 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
*/
private static class JobCheckpointStatsSnapshot implements JobCheckpointStats {
+ private static final long serialVersionUID = 7558212015099742418L;
+
// General
private final List<CheckpointStats> recentHistory;
private final long count;
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
new file mode 100644
index 0000000..aefc17d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link Execution and {@link ArchivedExecution}.
+ */
+public interface AccessExecution {
+ /**
+ * Returns the {@link ExecutionAttemptID} for this Execution.
+ *
+ * @return ExecutionAttemptID for this execution
+ */
+ ExecutionAttemptID getAttemptId();
+
+ /**
+ * Returns the attempt number for this execution.
+ *
+ * @return attempt number for this execution.
+ */
+ int getAttemptNumber();
+
+ /**
+ * Returns the timestamps for every {@link ExecutionState}.
+ *
+ * @return timestamps for each state
+ */
+ long[] getStateTimestamps();
+
+ /**
+ * Returns the current {@link ExecutionState} for this execution.
+ *
+ * @return execution state for this execution
+ */
+ ExecutionState getState();
+
+ /**
+ * Returns the {@link TaskManagerLocation} for this execution.
+ *
+ * @return taskmanager location for this execution.
+ */
+ TaskManagerLocation getAssignedResourceLocation();
+
+ /**
+ * Returns the exception that caused the job to fail. This is the first root exception
+ * that was not recoverable and triggered job failure.
+ *
+ * @return failure exception as a string, or {@code "(null)"}
+ */
+ String getFailureCauseAsString();
+
+ /**
+ * Returns the timestamp for the given {@link ExecutionState}.
+ *
+ * @param state state for which the timestamp should be returned
+ * @return timestamp for the given state
+ */
+ long getStateTimestamp(ExecutionState state);
+
+ /**
+ * Returns the user-defined accumulators as strings.
+ *
+ * @return user-defined accumulators as strings.
+ */
+ StringifiedAccumulatorResult[] getUserAccumulatorsStringified();
+
+ /**
+ * Returns the system-defined accumulators.
+ *
+ * @return system-defined accumulators.
+ * @deprecated Will be removed in FLINK-4527
+ */
+ @Deprecated
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators();
+
+ /**
+ * Returns the subtask index of this execution.
+ *
+ * @return subtask index of this execution.
+ */
+ int getParallelSubtaskIndex();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
new file mode 100644
index 0000000..0ff6ace
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link ExecutionGraph} and {@link ArchivedExecutionGraph}.
+ */
+public interface AccessExecutionGraph {
+ /**
+ * Returns the job plan as a JSON string.
+ *
+ * @return job plan as a JSON string
+ */
+ String getJsonPlan();
+
+ /**
+ * Returns the {@link JobID} for this execution graph.
+ *
+ * @return job ID for this execution graph
+ */
+ JobID getJobID();
+
+ /**
+ * Returns the job name for thie execution graph.
+ *
+ * @return job name for this execution graph
+ */
+ String getJobName();
+
+ /**
+ * Returns the current {@link JobStatus} for this execution graph.
+ *
+ * @return job status for this execution graph
+ */
+ JobStatus getState();
+
+ /**
+ * Returns the exception that caused the job to fail. This is the first root exception
+ * that was not recoverable and triggered job failure.
+ *
+ * @return failure causing exception as a string, or {@code "(null)"}
+ */
+ String getFailureCauseAsString();
+
+ /**
+ * Returns the job vertex for the given {@link JobVertexID}.
+ *
+ * @param id id of job vertex to be returned
+ * @return job vertex for the given id, or null
+ */
+ AccessExecutionJobVertex getJobVertex(JobVertexID id);
+
+ /**
+ * Returns a map containing all job vertices for this execution graph.
+ *
+ * @return map containing all job vertices for this execution graph
+ */
+ Map<JobVertexID, ? extends AccessExecutionJobVertex> getAllVertices();
+
+ /**
+ * Returns an iterable containing all job vertices for this execution graph in the order they were created.
+ *
+ * @return iterable containing all job vertices for this execution graph in the order they were creater
+ */
+ Iterable<? extends AccessExecutionJobVertex> getVerticesTopologically();
+
+ /**
+ * Returns an iterable containing all execution vertices for this execution graph.
+ *
+ * @return iterable containing all execution vertices for this execution graph
+ */
+ Iterable<? extends AccessExecutionVertex> getAllExecutionVertices();
+
+ /**
+ * Returns the timestamp for the given {@link JobStatus}
+ *
+ * @param status status for which the timestamp should be returned
+ * @return timestamp for the given job status
+ */
+ long getStatusTimestamp(JobStatus status);
+
+ /**
+ * Returns the {@link CheckpointStatsTracker} for this execution graph.
+ *
+ * @return CheckpointStatsTracker for thie execution graph
+ */
+ CheckpointStatsTracker getCheckpointStatsTracker();
+
+ /**
+ * Returns the {@link ArchivedExecutionConfig} for this execution graph.
+ *
+ * @return execution config summary for this execution graph, or null in case of errors
+ */
+ ArchivedExecutionConfig getArchivedExecutionConfig();
+
+ /**
+ * Returns whether the job for this execution graph is stoppable.
+ *
+ * @return true, if all sources tasks are stoppable, false otherwise
+ */
+ boolean isStoppable();
+
+ /**
+ * Returns the aggregated user-defined accumulators as strings.
+ *
+ * @return aggregated user-defined accumulators as strings.
+ */
+ StringifiedAccumulatorResult[] getAccumulatorResultsStringified();
+
+ /**
+ * Returns a map containing the serialized values of user-defined accumulators.
+ *
+ * @return map containing serialized values of user-defined accumulators
+ * @throws IOException indicates that the serialization has failed
+ */
+ Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException;
+
+ /**
+ * Returns the aggregated system-defined accumulators.
+ *
+ * @return aggregated system-defined accumulators.
+ * @deprecated Will be removed in FLINK-4527
+ */
+ @Deprecated
+ Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators();
+
+ /**
+ * Returns whether this execution graph was archived.
+ *
+ * @return true, if the execution graph was archived, false otherwise
+ */
+ boolean isArchived();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
new file mode 100644
index 0000000..c9bf604
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link ExecutionJobVertex} and {@link ArchivedExecutionJobVertex}.
+ */
+public interface AccessExecutionJobVertex {
+ /**
+ * Returns the name for this job vertex.
+ *
+ * @return name for this job vertex.
+ */
+ String getName();
+
+ /**
+ * Returns the parallelism for this job vertex.
+ *
+ * @return parallelism for this job vertex.
+ */
+ int getParallelism();
+
+ /**
+ * Returns the max parallelism for this job vertex.
+ *
+ * @return max parallelism for this job vertex.
+ */
+ int getMaxParallelism();
+
+ /**
+ * Returns the {@link JobVertexID} for this job vertex.
+ *
+ * @return JobVertexID for this job vertex.
+ */
+ JobVertexID getJobVertexId();
+
+ /**
+ * Returns all execution vertices for this job vertex.
+ *
+ * @return all execution vertices for this job vertex
+ */
+ AccessExecutionVertex[] getTaskVertices();
+
+ /**
+ * Returns the aggregated {@link ExecutionState} for this job vertex.
+ *
+ * @return aggregated state for this job vertex
+ */
+ ExecutionState getAggregateState();
+
+ /**
+ * Returns the {@link OperatorCheckpointStats} for this job vertex.
+ *
+ * @return checkpoint stats for this job vertex.
+ */
+ Option<OperatorCheckpointStats> getCheckpointStats();
+
+ /**
+ * Returns the aggregated system-defined accumulators.
+ *
+ * @return aggregated system-defined accumulators.
+ * @deprecated Will be removed in FLINK-4527
+ */
+ @Deprecated
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators();
+
+ /**
+ * Returns the aggregated user-defined accumulators as strings.
+ *
+ * @return aggregated user-defined accumulators as strings.
+ */
+ StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();
+}
[2/3] flink git commit: [FLINK-4720] Implement archived ExecutionGraph
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
new file mode 100644
index 0000000..9faf3fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}.
+ */
+public interface AccessExecutionVertex {
+ /**
+ * Returns the name of this execution vertex in the format "myTask (2/7)".
+ *
+ * @return name of this execution vertex
+ */
+ String getTaskNameWithSubtaskIndex();
+
+ /**
+ * Returns the subtask index of this execution vertex.
+ *
+ * @return subtask index of this execution vertex.
+ */
+ int getParallelSubtaskIndex();
+
+ /**
+ * Returns the current execution for this execution vertex.
+ *
+ * @return current execution
+ */
+ AccessExecution getCurrentExecutionAttempt();
+
+ /**
+ * Returns the current {@link ExecutionState} for this execution vertex.
+ *
+ * @return execution state for this execution vertex
+ */
+ ExecutionState getExecutionState();
+
+ /**
+ * Returns the timestamp for the given {@link ExecutionState}.
+ *
+ * @param state state for which the timestamp should be returned
+ * @return timestamp for the given state
+ */
+ long getStateTimestamp(ExecutionState state);
+
+ /**
+ * Returns the exception that caused the job to fail. This is the first root exception
+ * that was not recoverable and triggered job failure.
+ *
+ * @return failure exception as a string, or {@code "(null)"}
+ */
+ String getFailureCauseAsString();
+
+ /**
+ * Returns the {@link TaskManagerLocation} for this execution vertex.
+ *
+ * @return taskmanager location for this execution vertex.
+ */
+ TaskManagerLocation getCurrentAssignedResourceLocation();
+
+ /**
+ * Returns the execution for the given attempt number.
+ *
+ * @param attemptNumber attempt number of execution to be returned
+ * @return execution for the given attempt number
+ */
+ AccessExecution getPriorExecutionAttempt(int attemptNumber);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
new file mode 100644
index 0000000..0b2992f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class ArchivedExecution implements AccessExecution, Serializable {
+ private static final long serialVersionUID = 4817108757483345173L;
+ // --------------------------------------------------------------------------------------------
+
+ private final ExecutionAttemptID attemptId;
+
+ private final long[] stateTimestamps;
+
+ private final int attemptNumber;
+
+ private final ExecutionState state;
+
+ private final String failureCause; // once assigned, never changes
+
+ private final TaskManagerLocation assignedResourceLocation; // for the archived execution
+
+ /* Continuously updated map of user-defined accumulators */
+ private final StringifiedAccumulatorResult[] userAccumulators;
+
+ /* Continuously updated map of internal accumulators */
+ private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
+ private final int parallelSubtaskIndex;
+
+ public ArchivedExecution(Execution execution) {
+ this.userAccumulators = execution.getUserAccumulatorsStringified();
+ this.flinkAccumulators = execution.getFlinkAccumulators();
+ this.attemptId = execution.getAttemptId();
+ this.attemptNumber = execution.getAttemptNumber();
+ this.stateTimestamps = execution.getStateTimestamps();
+ this.parallelSubtaskIndex = execution.getVertex().getParallelSubtaskIndex();
+ this.state = execution.getState();
+ this.failureCause = ExceptionUtils.stringifyException(execution.getFailureCause());
+ this.assignedResourceLocation = execution.getAssignedResourceLocation();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Accessors
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public ExecutionAttemptID getAttemptId() {
+ return attemptId;
+ }
+
+ @Override
+ public int getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ @Override
+ public long[] getStateTimestamps() {
+ return stateTimestamps;
+ }
+
+ @Override
+ public ExecutionState getState() {
+ return state;
+ }
+
+ @Override
+ public TaskManagerLocation getAssignedResourceLocation() {
+ return assignedResourceLocation;
+ }
+
+ @Override
+ public String getFailureCauseAsString() {
+ return failureCause;
+ }
+
+ @Override
+ public long getStateTimestamp(ExecutionState state) {
+ return this.stateTimestamps[state.ordinal()];
+ }
+
+ @Override
+ public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
+ return userAccumulators;
+ }
+
+ @Override
+ public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
+ return flinkAccumulators;
+ }
+
+ @Override
+ public int getParallelSubtaskIndex() {
+ return parallelSubtaskIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
new file mode 100644
index 0000000..493825a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
+ private static final long serialVersionUID = 7231383912742578428L;
+ // --------------------------------------------------------------------------------------------
+
+ /** The ID of the job this graph has been built for. */
+ private final JobID jobID;
+
+ /** The name of the original job graph. */
+ private final String jobName;
+
+ /** All job vertices that are part of this graph */
+ private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
+
+ /** All vertices, in the order in which they were created **/
+ private final List<ArchivedExecutionJobVertex> verticesInCreationOrder;
+
+ /**
+ * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
+ * the execution graph transitioned into a certain state. The index into this array is the
+ * ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is
+ * at {@code stateTimestamps[RUNNING.ordinal()]}.
+ */
+ private final long[] stateTimestamps;
+
+ // ------ Configuration of the Execution -------
+
+ // ------ Execution status and progress. These values are volatile, and accessed under the lock -------
+
+ /** Current status of the job execution */
+ private final JobStatus state;
+
+ /**
+ * The exception that caused the job to fail. This is set to the first root exception
+ * that was not recoverable and triggered job failure
+ */
+ private final String failureCause;
+
+ // ------ Fields that are only relevant for archived execution graphs ------------
+ private final String jsonPlan;
+ private final StringifiedAccumulatorResult[] archivedUserAccumulators;
+ private final ArchivedExecutionConfig archivedExecutionConfig;
+ private final boolean isStoppable;
+ private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
+ private final ArchivedCheckpointStatsTracker tracker;
+
+ public ArchivedExecutionGraph(
+ JobID jobID,
+ String jobName,
+ Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
+ List<ArchivedExecutionJobVertex> verticesInCreationOrder,
+ long[] stateTimestamps,
+ JobStatus state,
+ String failureCause,
+ String jsonPlan,
+ StringifiedAccumulatorResult[] archivedUserAccumulators,
+ Map<String, SerializedValue<Object>> serializedUserAccumulators,
+ ArchivedExecutionConfig executionConfig,
+ boolean isStoppable,
+ ArchivedCheckpointStatsTracker tracker
+ ) {
+ this.jobID = jobID;
+ this.jobName = jobName;
+ this.tasks = tasks;
+ this.verticesInCreationOrder = verticesInCreationOrder;
+ this.stateTimestamps = stateTimestamps;
+ this.state = state;
+ this.failureCause = failureCause;
+ this.jsonPlan = jsonPlan;
+ this.archivedUserAccumulators = archivedUserAccumulators;
+ this.serializedUserAccumulators = serializedUserAccumulators;
+ this.archivedExecutionConfig = executionConfig;
+ this.isStoppable = isStoppable;
+ this.tracker = tracker;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ @Override
+ public String getJsonPlan() {
+ return jsonPlan;
+ }
+
+ @Override
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ @Override
+ public String getJobName() {
+ return jobName;
+ }
+
+ @Override
+ public JobStatus getState() {
+ return state;
+ }
+
+ @Override
+ public String getFailureCauseAsString() {
+ return failureCause;
+ }
+
+ @Override
+ public ArchivedExecutionJobVertex getJobVertex(JobVertexID id) {
+ return this.tasks.get(id);
+ }
+
+ @Override
+ public Map<JobVertexID, AccessExecutionJobVertex> getAllVertices() {
+ return Collections.<JobVertexID, AccessExecutionJobVertex>unmodifiableMap(this.tasks);
+ }
+
+ @Override
+ public Iterable<ArchivedExecutionJobVertex> getVerticesTopologically() {
+ // we return a specific iterator that does not fail with concurrent modifications
+ // the list is append only, so it is safe for that
+ final int numElements = this.verticesInCreationOrder.size();
+
+ return new Iterable<ArchivedExecutionJobVertex>() {
+ @Override
+ public Iterator<ArchivedExecutionJobVertex> iterator() {
+ return new Iterator<ArchivedExecutionJobVertex>() {
+ private int pos = 0;
+
+ @Override
+ public boolean hasNext() {
+ return pos < numElements;
+ }
+
+ @Override
+ public ArchivedExecutionJobVertex next() {
+ if (hasNext()) {
+ return verticesInCreationOrder.get(pos++);
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public Iterable<ArchivedExecutionVertex> getAllExecutionVertices() {
+ return new Iterable<ArchivedExecutionVertex>() {
+ @Override
+ public Iterator<ArchivedExecutionVertex> iterator() {
+ return new AllVerticesIterator(getVerticesTopologically().iterator());
+ }
+ };
+ }
+
+ @Override
+ public long getStatusTimestamp(JobStatus status) {
+ return this.stateTimestamps[status.ordinal()];
+ }
+
+ @Override
+ public CheckpointStatsTracker getCheckpointStatsTracker() {
+ return tracker;
+ }
+
+ /**
+ * Gets the internal flink accumulator map of maps which contains some metrics.
+ *
+ * @return A map of accumulators for every executed task.
+ */
+ @Override
+ public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators() {
+ Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
+ new HashMap<>();
+
+ for (AccessExecutionVertex vertex : getAllExecutionVertices()) {
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+ flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs);
+ }
+
+ return flinkAccumulators;
+ }
+
+ @Override
+ public boolean isArchived() {
+ return true;
+ }
+
+ public StringifiedAccumulatorResult[] getUserAccumulators() {
+ return archivedUserAccumulators;
+ }
+
+ public ArchivedExecutionConfig getArchivedExecutionConfig() {
+ return archivedExecutionConfig;
+ }
+
+ @Override
+ public boolean isStoppable() {
+ return isStoppable;
+ }
+
+ @Override
+ public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
+ return archivedUserAccumulators;
+ }
+
+ @Override
+ public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() {
+ return serializedUserAccumulators;
+ }
+
+ class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> {
+
+ private final Iterator<ArchivedExecutionJobVertex> jobVertices;
+
+ private ArchivedExecutionVertex[] currVertices;
+
+ private int currPos;
+
+
+ public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> jobVertices) {
+ this.jobVertices = jobVertices;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ if (currVertices != null) {
+ if (currPos < currVertices.length) {
+ return true;
+ } else {
+ currVertices = null;
+ }
+ } else if (jobVertices.hasNext()) {
+ currVertices = jobVertices.next().getTaskVertices();
+ currPos = 0;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public ArchivedExecutionVertex next() {
+ if (hasNext()) {
+ return currVertices[currPos++];
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
new file mode 100644
index 0000000..4857bf5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregateJobVertexState;
+
+public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Serializable {
+
+ private static final long serialVersionUID = -5768187638639437957L;
+ private final ArchivedExecutionVertex[] taskVertices;
+
+ private final JobVertexID id;
+
+ private final String name;
+
+ private final int parallelism;
+
+ private final int maxParallelism;
+
+ private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> aggregatedMetricAccumulators;
+ private final Option<OperatorCheckpointStats> checkpointStats;
+ private final StringifiedAccumulatorResult[] archivedUserAccumulators;
+
+ public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
+ this.taskVertices = new ArchivedExecutionVertex[jobVertex.getTaskVertices().length];
+ for (int x = 0; x < taskVertices.length; x++) {
+ taskVertices[x] = jobVertex.getTaskVertices()[x].archive();
+ }
+
+ aggregatedMetricAccumulators = jobVertex.getAggregatedMetricAccumulators();
+
+ Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = new HashMap<>();
+ for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
+ if (next != null) {
+ AccumulatorHelper.mergeInto(tmpArchivedUserAccumulators, next);
+ }
+ }
+ archivedUserAccumulators = jobVertex.getAggregatedUserAccumulatorsStringified();
+
+ this.id = jobVertex.getJobVertexId();
+ this.name = jobVertex.getJobVertex().getName();
+ this.parallelism = jobVertex.getParallelism();
+ this.maxParallelism = jobVertex.getMaxParallelism();
+ CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker();
+ checkpointStats = tracker != null
+ ? tracker.getOperatorStats(this.id)
+ : Option.<OperatorCheckpointStats>empty();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Accessors
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public int getMaxParallelism() {
+ return maxParallelism;
+ }
+
+ @Override
+ public JobVertexID getJobVertexId() {
+ return id;
+ }
+
+ @Override
+ public ArchivedExecutionVertex[] getTaskVertices() {
+ return taskVertices;
+ }
+
+ @Override
+ public ExecutionState getAggregateState() {
+ int[] num = new int[ExecutionState.values().length];
+ for (ArchivedExecutionVertex vertex : this.taskVertices) {
+ num[vertex.getExecutionState().ordinal()]++;
+ }
+
+ return getAggregateJobVertexState(num, parallelism);
+ }
+
+ public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() {
+ return this.aggregatedMetricAccumulators;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Static / pre-assigned input splits
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public Option<OperatorCheckpointStats> getCheckpointStats() {
+ return checkpointStats;
+ }
+
+ @Override
+ public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
+ return archivedUserAccumulators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
new file mode 100644
index 0000000..e1fb11a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
+
+ private static final long serialVersionUID = -6708241535015028576L;
+ private final int subTaskIndex;
+
+ private final List<ArchivedExecution> priorExecutions;
+
+ /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+ private final String taskNameWithSubtask;
+
+ private final ArchivedExecution currentExecution; // this field must never be null
+
+ public ArchivedExecutionVertex(ExecutionVertex vertex) {
+ this.subTaskIndex = vertex.getParallelSubtaskIndex();
+ this.priorExecutions = new ArrayList<>();
+ for (Execution priorExecution : vertex.getPriorExecutions()) {
+ priorExecutions.add(priorExecution.archive());
+ }
+ this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
+ this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Accessors
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String getTaskNameWithSubtaskIndex() {
+ return this.taskNameWithSubtask;
+ }
+
+ @Override
+ public int getParallelSubtaskIndex() {
+ return this.subTaskIndex;
+ }
+
+ @Override
+ public ArchivedExecution getCurrentExecutionAttempt() {
+ return currentExecution;
+ }
+
+ @Override
+ public ExecutionState getExecutionState() {
+ return currentExecution.getState();
+ }
+
+ @Override
+ public long getStateTimestamp(ExecutionState state) {
+ return currentExecution.getStateTimestamp(state);
+ }
+
+ @Override
+ public String getFailureCauseAsString() {
+ return currentExecution.getFailureCauseAsString();
+ }
+
+ @Override
+ public TaskManagerLocation getCurrentAssignedResourceLocation() {
+ return currentExecution.getAssignedResourceLocation();
+ }
+
+ @Override
+ public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) {
+ if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+ return priorExecutions.get(attemptNumber);
+ } else {
+ throw new IllegalArgumentException("attempt does not exist");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b92e3af..0b56931 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
import akka.dispatch.OnComplete;
import akka.dispatch.OnFailure;
+import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -102,7 +103,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
* actions if it is not. Many actions are also idempotent (like canceling).
*/
-public class Execution {
+public class Execution implements AccessExecution, Archiveable<ArchivedExecution> {
private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
@@ -188,14 +189,17 @@ public class Execution {
return vertex;
}
+ @Override
public ExecutionAttemptID getAttemptId() {
return attemptId;
}
+ @Override
public int getAttemptNumber() {
return attemptNumber;
}
+ @Override
public ExecutionState getState() {
return state;
}
@@ -204,6 +208,7 @@ public class Execution {
return assignedResource;
}
+ @Override
public TaskManagerLocation getAssignedResourceLocation() {
return assignedResourceLocation;
}
@@ -212,10 +217,17 @@ public class Execution {
return failureCause;
}
+ @Override
+ public String getFailureCauseAsString() {
+ return ExceptionUtils.stringifyException(getFailureCause());
+ }
+
+ @Override
public long[] getStateTimestamps() {
return stateTimestamps;
}
+ @Override
public long getStateTimestamp(ExecutionState state) {
return this.stateTimestamps[state.ordinal()];
}
@@ -237,21 +249,6 @@ public class Execution {
}
/**
- * This method cleans fields that are irrelevant for the archived execution attempt.
- */
- public void prepareForArchiving() {
- if (assignedResource != null && assignedResource.isAlive()) {
- throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running.");
- }
- assignedResource = null;
-
- executionContext = null;
-
- partialInputChannelDeploymentDescriptors.clear();
- partialInputChannelDeploymentDescriptors = null;
- }
-
- /**
* Sets the initial state for the execution. The serialized state is then shipped via the
* {@link TaskDeploymentDescriptor} to the TaskManagers.
*
@@ -1055,14 +1052,21 @@ public class Execution {
return userAccumulators;
}
+ @Override
public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
}
+ @Override
public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
return flinkAccumulators;
}
+ @Override
+ public int getParallelSubtaskIndex() {
+ return getVertex().getParallelSubtaskIndex();
+ }
+
// ------------------------------------------------------------------------
// Standard utilities
// ------------------------------------------------------------------------
@@ -1072,4 +1076,9 @@ public class Execution {
return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
}
+
+ @Override
+ public ArchivedExecution archive() {
+ return new ArchivedExecution(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 10f0e88..aa9406c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -32,14 +33,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
-import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
+import org.apache.flink.api.common.Archiveable;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -58,8 +61,10 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
+import scala.Option;
import java.io.IOException;
import java.net.URL;
@@ -102,7 +107,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* address the message receiver.</li>
* </ul>
*/
-public class ExecutionGraph {
+public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> {
private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
@@ -180,9 +185,6 @@ public class ExecutionGraph {
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
- /** Flag to indicate whether the Graph has been archived */
- private boolean isArchived = false;
-
// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
/** Current status of the job execution */
@@ -222,9 +224,6 @@ public class ExecutionGraph {
// ------ Fields that are only relevant for archived execution graphs ------------
private String jsonPlan;
- /** Serializable summary of all job config values, e.g. for web interface */
- private ExecutionConfigSummary executionConfigSummary;
-
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
@@ -304,16 +303,6 @@ public class ExecutionGraph {
metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
-
- // create a summary of all relevant data accessed in the web interface's JobConfigHandler
- try {
- ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader);
- if (executionConfig != null) {
- this.executionConfigSummary = new ExecutionConfigSummary(executionConfig);
- }
- } catch (IOException | ClassNotFoundException e) {
- LOG.error("Couldn't create ExecutionConfigSummary for job {} ", jobID, e);
- }
}
// --------------------------------------------------------------------------------------------
@@ -344,8 +333,9 @@ public class ExecutionGraph {
return scheduleMode;
}
+ @Override
public boolean isArchived() {
- return isArchived;
+ return false;
}
public void enableSnapshotCheckpointing(
@@ -434,6 +424,7 @@ public class ExecutionGraph {
return restartStrategy;
}
+ @Override
public CheckpointStatsTracker getCheckpointStatsTracker() {
return checkpointStatsTracker;
}
@@ -484,6 +475,7 @@ public class ExecutionGraph {
this.jsonPlan = jsonPlan;
}
+ @Override
public String getJsonPlan() {
return jsonPlan;
}
@@ -492,14 +484,17 @@ public class ExecutionGraph {
return slotProvider;
}
+ @Override
public JobID getJobID() {
return jobID;
}
+ @Override
public String getJobName() {
return jobName;
}
+ @Override
public boolean isStoppable() {
return this.isStoppable;
}
@@ -512,6 +507,7 @@ public class ExecutionGraph {
return this.userClassLoader;
}
+ @Override
public JobStatus getState() {
return state;
}
@@ -520,14 +516,22 @@ public class ExecutionGraph {
return failureCause;
}
+ @Override
+ public String getFailureCauseAsString() {
+ return ExceptionUtils.stringifyException(failureCause);
+ }
+
+ @Override
public ExecutionJobVertex getJobVertex(JobVertexID id) {
return this.tasks.get(id);
}
+ @Override
public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
return Collections.unmodifiableMap(this.tasks);
}
+ @Override
public Iterable<ExecutionJobVertex> getVerticesTopologically() {
// we return a specific iterator that does not fail with concurrent modifications
// the list is append only, so it is safe for that
@@ -566,6 +570,7 @@ public class ExecutionGraph {
return Collections.unmodifiableMap(this.intermediateResults);
}
+ @Override
public Iterable<ExecutionVertex> getAllExecutionVertices() {
return new Iterable<ExecutionVertex>() {
@Override
@@ -575,6 +580,7 @@ public class ExecutionGraph {
};
}
+ @Override
public long getStatusTimestamp(JobStatus status) {
return this.stateTimestamps[status.ordinal()];
}
@@ -592,6 +598,7 @@ public class ExecutionGraph {
* Gets the internal flink accumulator map of maps which contains some metrics.
* @return A map of accumulators for every executed task.
*/
+ @Override
public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> getFlinkAccumulators() {
Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
new HashMap<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>();
@@ -627,6 +634,7 @@ public class ExecutionGraph {
* @return The accumulator map with serialized accumulator values.
* @throws IOException
*/
+ @Override
public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
@@ -643,6 +651,7 @@ public class ExecutionGraph {
* Returns the a stringified version of the user-defined accumulators.
* @return an Array containing the StringifiedAccumulatorResult objects
*/
+ @Override
public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
@@ -926,51 +935,21 @@ public class ExecutionGraph {
}
/**
- * This method cleans fields that are irrelevant for the archived execution attempt.
+ * Returns the serializable ArchivedExecutionConfig
+ * @return ArchivedExecutionConfig which may be null in case of errors
*/
- public void prepareForArchiving() {
- if (!state.isGloballyTerminalState()) {
- throw new IllegalStateException("Can only archive the job from a terminal state");
- }
-
- // clear the non-serializable fields
- restartStrategy = null;
- slotProvider = null;
- checkpointCoordinator = null;
- executionContext = null;
- kvStateLocationRegistry = null;
-
- for (ExecutionJobVertex vertex : verticesInCreationOrder) {
- vertex.prepareForArchiving();
- }
-
- intermediateResults.clear();
- currentExecutions.clear();
- requiredJarFiles.clear();
- requiredClasspaths.clear();
- jobStatusListeners.clear();
- executionListeners.clear();
-
- if (userClassLoader instanceof FlinkUserCodeClassLoader) {
- try {
- // close the classloader to free space of user jars immediately
- // otherwise we have to wait until garbage collection
- ((FlinkUserCodeClassLoader) userClassLoader).close();
- } catch (IOException e) {
- LOG.warn("Failed to close the user classloader for job {}", jobID, e);
+ @Override
+ public ArchivedExecutionConfig getArchivedExecutionConfig() {
+ // create a summary of all relevant data accessed in the web interface's JobConfigHandler
+ try {
+ ExecutionConfig executionConfig = getSerializedExecutionConfig().deserializeValue(userClassLoader);
+ if (executionConfig != null) {
+ return executionConfig.archive();
}
- }
- userClassLoader = null;
-
- isArchived = true;
- }
-
- /**
- * Returns the serializable ExecutionConfigSummary
- * @return ExecutionConfigSummary which may be null in case of errors
- */
- public ExecutionConfigSummary getExecutionConfigSummary() {
- return executionConfigSummary;
+ } catch (IOException | ClassNotFoundException e) {
+ LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", jobID, e);
+ };
+ return null;
}
/**
@@ -1282,4 +1261,53 @@ public class ExecutionGraph {
}
}
}
+
+ @Override
+ public ArchivedExecutionGraph archive() {
+ Map<JobVertexID, OperatorCheckpointStats> operatorStats = new HashMap<>();
+ Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>();
+ List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
+ for (ExecutionJobVertex task : verticesInCreationOrder) {
+ ArchivedExecutionJobVertex archivedTask = task.archive();
+ archivedVerticesInCreationOrder.add(archivedTask);
+ archivedTasks.put(task.getJobVertexId(), archivedTask);
+ Option<OperatorCheckpointStats> statsOption = task.getCheckpointStats();
+ if (statsOption.isDefined()) {
+ operatorStats.put(task.getJobVertexId(), statsOption.get());
+ }
+ }
+
+ Option<JobCheckpointStats> jobStats;
+ if (getCheckpointStatsTracker() == null) {
+ jobStats = Option.empty();
+ } else {
+ jobStats = getCheckpointStatsTracker().getJobStats();
+ }
+
+ ArchivedCheckpointStatsTracker statsTracker = new ArchivedCheckpointStatsTracker(jobStats, operatorStats);
+
+ Map<String, SerializedValue<Object>> serializedUserAccumulators;
+ try {
+ serializedUserAccumulators = getAccumulatorsSerialized();
+ } catch (Exception e) {
+ LOG.warn("Error occurred while archiving user accumulators.", e);
+ serializedUserAccumulators = Collections.emptyMap();
+ }
+
+ return new ArchivedExecutionGraph(
+ getJobID(),
+ getJobName(),
+ archivedTasks,
+ archivedVerticesInCreationOrder,
+ stateTimestamps,
+ getState(),
+ getFailureCauseAsString(),
+ getJsonPlan(),
+ getAccumulatorResultsStringified(),
+ serializedUserAccumulators,
+ getArchivedExecutionConfig(),
+ isStoppable(),
+ statsTracker
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ead0852..e7f16a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -28,7 +28,10 @@ import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.Archiveable;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -43,6 +46,7 @@ import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
+import scala.Option;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
@@ -51,7 +55,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ExecutionJobVertex {
+public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> {
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
@@ -197,10 +201,17 @@ public class ExecutionJobVertex {
return jobVertex;
}
+ @Override
+ public String getName() {
+ return getJobVertex().getName();
+ }
+
+ @Override
public int getParallelism() {
return parallelism;
}
+ @Override
public int getMaxParallelism() {
return maxParallelism;
}
@@ -209,10 +220,12 @@ public class ExecutionJobVertex {
return graph.getJobID();
}
+ @Override
public JobVertexID getJobVertexId() {
return jobVertex.getID();
}
+ @Override
public ExecutionVertex[] getTaskVertices() {
return taskVertices;
}
@@ -241,6 +254,7 @@ public class ExecutionJobVertex {
return numSubtasksInFinalState == parallelism;
}
+ @Override
public ExecutionState getAggregateState() {
int[] num = new int[ExecutionState.values().length];
for (ExecutionVertex vertex : this.taskVertices) {
@@ -250,6 +264,16 @@ public class ExecutionJobVertex {
return getAggregateJobVertexState(num, parallelism);
}
+ @Override
+ public Option<OperatorCheckpointStats> getCheckpointStats() {
+ CheckpointStatsTracker tracker = getGraph().getCheckpointStatsTracker();
+ if (tracker == null) {
+ return Option.empty();
+ } else {
+ return tracker.getOperatorStats(getJobVertexId());
+ }
+ }
+
//---------------------------------------------------------------------------------------------
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
@@ -371,36 +395,6 @@ public class ExecutionJobVertex {
}
}
- /**
- * This method cleans fields that are irrelevant for the archived execution attempt.
- */
- public void prepareForArchiving() {
-
- for (ExecutionVertex vertex : taskVertices) {
- vertex.prepareForArchiving();
- }
-
- // clear intermediate results
- inputs.clear();
- producedDataSets = null;
-
- // reset shared groups
- if (slotSharingGroup != null) {
- slotSharingGroup.clearTaskAssignment();
- }
- if (coLocationGroup != null) {
- coLocationGroup.resetConstraints();
- }
-
- // reset splits and split assigner
- splitAssigner = null;
- if (inputSplits != null) {
- for (int i = 0; i < inputSplits.length; i++) {
- inputSplits[i] = null;
- }
- }
- }
-
//---------------------------------------------------------------------------------------------
// Notifications
//---------------------------------------------------------------------------------------------
@@ -627,4 +621,9 @@ public class ExecutionJobVertex {
return ExecutionState.CREATED;
}
}
+
+ @Override
+ public ArchivedExecutionJobVertex archive() {
+ return new ArchivedExecutionJobVertex(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 4837803..96af91e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.Archiveable;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -72,7 +74,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
* The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
* which time it spawns an {@link Execution}.
*/
-public class ExecutionVertex {
+public class ExecutionVertex implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
private static final Logger LOG = ExecutionGraph.LOG;
@@ -176,6 +178,7 @@ public class ExecutionVertex {
return this.jobVertex.getJobVertex().getName();
}
+ @Override
public String getTaskNameWithSubtaskIndex() {
return this.taskNameWithSubtask;
}
@@ -188,6 +191,7 @@ public class ExecutionVertex {
return this.jobVertex.getMaxParallelism();
}
+ @Override
public int getParallelSubtaskIndex() {
return this.subTaskIndex;
}
@@ -207,18 +211,26 @@ public class ExecutionVertex {
return locationConstraint;
}
+ @Override
public Execution getCurrentExecutionAttempt() {
return currentExecution;
}
+ @Override
public ExecutionState getExecutionState() {
return currentExecution.getState();
}
+ @Override
public long getStateTimestamp(ExecutionState state) {
return currentExecution.getStateTimestamp(state);
}
+ @Override
+ public String getFailureCauseAsString() {
+ return ExceptionUtils.stringifyException(getFailureCause());
+ }
+
public Throwable getFailureCause() {
return currentExecution.getFailureCause();
}
@@ -227,10 +239,12 @@ public class ExecutionVertex {
return currentExecution.getAssignedResource();
}
+ @Override
public TaskManagerLocation getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation();
}
+ @Override
public Execution getPriorExecutionAttempt(int attemptNumber) {
if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
return priorExecutions.get(attemptNumber);
@@ -240,6 +254,10 @@ public class ExecutionVertex {
}
}
+ List<Execution> getPriorExecutions() {
+ return priorExecutions;
+ }
+
public ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
}
@@ -537,31 +555,6 @@ public class ExecutionVertex {
}
}
- /**
- * This method cleans fields that are irrelevant for the archived execution attempt.
- */
- public void prepareForArchiving() throws IllegalStateException {
- Execution execution = currentExecution;
-
- // sanity check
- if (!execution.isFinished()) {
- throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
- }
-
- // prepare the current execution for archiving
- execution.prepareForArchiving();
-
- // prepare previous executions for archiving
- for (Execution exec : priorExecutions) {
- exec.prepareForArchiving();
- }
-
- // clear the unnecessary fields in this class
- this.resultPartitions = null;
- this.inputEdges = null;
- this.locationConstraint = null;
- }
-
public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo){
getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
}
@@ -708,4 +701,9 @@ public class ExecutionVertex {
public String toString() {
return getSimpleName();
}
+
+ @Override
+ public ArchivedExecutionVertex archive() {
+ return new ArchivedExecutionVertex(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
deleted file mode 100644
index ad4677f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.executiongraph.archive;
-
-import org.apache.flink.api.common.ExecutionConfig;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Serializable class which is created when archiving the job.
- * It can be used to display job information on the web interface
- * without having to keep the classloader around after job completion.
- */
-public class ExecutionConfigSummary implements Serializable {
-
- private final String executionMode;
- private final String restartStrategyDescription;
- private final int parallelism;
- private final boolean objectReuseEnabled;
- private final Map<String, String> globalJobParameters;
-
- public ExecutionConfigSummary(ExecutionConfig ec) {
- executionMode = ec.getExecutionMode().name();
- if (ec.getRestartStrategy() != null) {
- restartStrategyDescription = ec.getRestartStrategy().getDescription();
- } else {
- restartStrategyDescription = "default";
- }
- parallelism = ec.getParallelism();
- objectReuseEnabled = ec.isObjectReuseEnabled();
- if (ec.getGlobalJobParameters() != null
- && ec.getGlobalJobParameters().toMap() != null) {
- globalJobParameters = ec.getGlobalJobParameters().toMap();
- } else {
- globalJobParameters = Collections.emptyMap();
- }
- }
-
- public String getExecutionMode() {
- return executionMode;
- }
-
- public String getRestartStrategyDescription() {
- return restartStrategyDescription;
- }
-
- public int getParallelism() {
- return parallelism;
- }
-
- public boolean getObjectReuseEnabled() {
- return objectReuseEnabled;
- }
-
- public Map<String, String> getGlobalJobParameters() {
- return globalJobParameters;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 37a91b3..87df0d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -27,9 +27,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -164,7 +164,7 @@ public final class WebMonitorUtils {
}
}
- public static JobDetails createDetailsForJob(ExecutionGraph job) {
+ public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
JobStatus status = job.getState();
long started = job.getStatusTimestamp(JobStatus.CREATED);
@@ -174,11 +174,11 @@ public final class WebMonitorUtils {
long lastChanged = 0;
int numTotalTasks = 0;
- for (ExecutionJobVertex ejv : job.getVerticesTopologically()) {
- ExecutionVertex[] vertices = ejv.getTaskVertices();
+ for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
+ AccessExecutionVertex[] vertices = ejv.getTaskVertices();
numTotalTasks += vertices.length;
- for (ExecutionVertex vertex : vertices) {
+ for (AccessExecutionVertex vertex : vertices) {
ExecutionState state = vertex.getExecutionState();
countsPerStatus[state.ordinal()]++;
lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state));
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cca0124..8f3b82a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1685,12 +1685,9 @@ class JobManager(
}(context.dispatcher))
try {
- eg.prepareForArchiving()
-
- archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
+ archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive()))
} catch {
- case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
- "archiving.", t)
+ case t: Throwable => log.error(s"Could not archive the execution graph $eg.", t)
}
futureOption
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 2d55b26..7f8fcd3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.webmonitor.WebMonitorUtils
import org.apache.flink.runtime.{FlinkActor, LogMessages}
import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph}
import org.apache.flink.runtime.messages.ArchiveMessages._
import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -66,7 +66,7 @@ class MemoryArchivist(private val max_entries: Int)
* Map of execution graphs belonging to recently started jobs with the time stamp of the last
* received job event. The insert order is preserved through a LinkedHashMap.
*/
- protected val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
+ protected val graphs = mutable.LinkedHashMap[JobID, ArchivedExecutionGraph]()
/* Counters for finished, canceled, and failed jobs */
private[this] var finishedCnt: Int = 0
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index c4e3f3e..435b736 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -19,14 +19,14 @@
package org.apache.flink.runtime.messages
import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph}
/**
* This object contains the archive specific messages.
*/
object ArchiveMessages {
- case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph)
+ case class ArchiveExecutionGraph(jobID: JobID, graph: ArchivedExecutionGraph)
/**
* Request the currently archived jobs in the archiver. The resulting response is [[ArchivedJobs]]
@@ -44,19 +44,19 @@ object ArchiveMessages {
*/
case class RequestArchivedJob(jobID: JobID)
- case class ArchivedJob(job: Option[ExecutionGraph])
+ case class ArchivedJob(job: Option[ArchivedExecutionGraph])
/**
* Response to [[RequestArchivedJobs]] message. The response contains the archived jobs.
* @param jobs
*/
- case class ArchivedJobs(jobs: Iterable[ExecutionGraph]){
- def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
+ case class ArchivedJobs(jobs: Iterable[ArchivedExecutionGraph]){
+ def asJavaIterable: java.lang.Iterable[ArchivedExecutionGraph] = {
import scala.collection.JavaConverters._
jobs.asJava
}
- def asJavaCollection: java.util.Collection[ExecutionGraph] = {
+ def asJavaCollection: java.util.Collection[ArchivedExecutionGraph] = {
import scala.collection.JavaConverters._
jobs.asJavaCollection
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 4cf6a02..3df8c26 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.blob.BlobKey
import org.apache.flink.runtime.client.{JobStatusMessage, SerializedJobExecutionResult}
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.instance.{Instance, InstanceID}
import org.apache.flink.runtime.io.network.partition.ResultPartitionID
import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
@@ -371,7 +371,7 @@ object JobManagerMessages {
* @param jobID
* @param executionGraph
*/
- case class JobFound(jobID: JobID, executionGraph: ExecutionGraph) extends JobResponse
+ case class JobFound(jobID: JobID, executionGraph: AccessExecutionGraph) extends JobResponse
/**
* Denotes that there is no job with [[jobID]] retrievable. This message can be the response of
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ea4d322..0b2f4f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -81,7 +81,7 @@ public class CoordinatorShutdownTest {
new JobManagerMessages.RequestJob(testGraph.getJobID()),
timeout);
- ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
+ ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
assertNotNull(graph);
graph.waitUntilFinished();
@@ -133,7 +133,7 @@ public class CoordinatorShutdownTest {
new JobManagerMessages.RequestJob(testGraph.getJobID()),
timeout);
- ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
+ ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
assertNotNull(graph);
graph.waitUntilFinished();