You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/07 11:42:41 UTC
flink git commit: [FLINK-5645] Store accumulators/metrics for
canceled/failed tasks
Repository: flink
Updated Branches:
refs/heads/master 746c1efa5 -> 821da81fe
[FLINK-5645] Store accumulators/metrics for canceled/failed tasks
This closes #3377.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/821da81f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/821da81f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/821da81f
Branch: refs/heads/master
Commit: 821da81fe6bee4c5a33be19c08064491fd6280de
Parents: 746c1ef
Author: zentol <ch...@apache.org>
Authored: Tue Feb 21 12:36:17 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Mar 7 10:20:43 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 37 ++++++--
.../runtime/executiongraph/ExecutionGraph.java | 23 ++++-
.../flink/runtime/executiongraph/IOMetrics.java | 13 ++-
.../ExecutionGraphDeploymentTest.java | 98 ++++++++++++++++++--
4 files changed, 150 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/821da81f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 3191d76..e17a3e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -706,6 +706,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
processFail(t, true);
}
+ void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+ processFail(t, true, userAccumulators, metrics);
+ }
+
void markFinished() {
markFinished(null, null);
}
@@ -731,10 +735,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
}
- synchronized (accumulatorLock) {
- this.userAccumulators = userAccumulators;
- }
- this.ioMetrics = metrics;
+ updateAccumulatorsAndMetrics(userAccumulators, metrics);
assignedResource.releaseSlot();
vertex.getExecutionGraph().deregisterExecution(this);
@@ -748,7 +749,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
else if (current == CANCELING) {
// we sent a cancel call, and the task manager finished before it arrived. We
// will never get a CANCELED call back from the job manager
- cancelingComplete();
+ cancelingComplete(userAccumulators, metrics);
return;
}
else if (current == CANCELED || current == FAILED) {
@@ -766,6 +767,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
void cancelingComplete() {
+ cancelingComplete(null, null);
+ }
+
+ void cancelingComplete(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
// network stack is canceled (for example by a failing / canceling receiver or sender
@@ -779,6 +784,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
return;
}
else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
+
+ updateAccumulatorsAndMetrics(userAccumulators, metrics);
+
if (transitionState(current, CANCELED)) {
try {
assignedResource.releaseSlot();
@@ -834,6 +842,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// --------------------------------------------------------------------------------------------
private boolean processFail(Throwable t, boolean isCallback) {
+ return processFail(t, isCallback, null, null);
+ }
+
+ private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
@@ -857,7 +869,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
if (current == CANCELING) {
- cancelingComplete();
+ cancelingComplete(userAccumulators, metrics);
return false;
}
@@ -865,6 +877,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// success (in a manner of speaking)
this.failureCause = t;
+ updateAccumulatorsAndMetrics(userAccumulators, metrics);
+
try {
if (assignedResource != null) {
assignedResource.releaseSlot();
@@ -1093,6 +1107,17 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
return ioMetrics;
}
+ private void updateAccumulatorsAndMetrics(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+ if (userAccumulators != null) {
+ synchronized (accumulatorLock) {
+ this.userAccumulators = userAccumulators;
+ }
+ }
+ if (metrics != null) {
+ this.ioMetrics = metrics;
+ }
+ }
+
// ------------------------------------------------------------------------
// Standard utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/821da81f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5fa40fc..6bb3455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1281,9 +1281,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return attempt.switchToRunning();
case FINISHED:
try {
- AccumulatorSnapshot accumulators = state.getAccumulators();
- Map<String, Accumulator<?, ?>> userAccumulators =
- accumulators.deserializeUserAccumulators(userClassLoader);
+ Map<String, Accumulator<?, ?>> userAccumulators = deserializeAccumulators(state);
attempt.markFinished(userAccumulators, state.getIOMetrics());
}
catch (Exception e) {
@@ -1292,10 +1290,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
return true;
case CANCELED:
- attempt.cancelingComplete();
+ Map<String, Accumulator<?, ?>> userAcc1 = deserializeAccumulators(state);
+ attempt.cancelingComplete(userAcc1, state.getIOMetrics());
return true;
case FAILED:
- attempt.markFailed(state.getError(userClassLoader));
+ Map<String, Accumulator<?, ?>> userAcc2 = deserializeAccumulators(state);
+ attempt.markFailed(state.getError(userClassLoader), userAcc2, state.getIOMetrics());
return true;
default:
// we mark as failed and return false, which triggers the TaskManager
@@ -1309,6 +1309,19 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
+ private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) {
+ AccumulatorSnapshot serializedAccumulators = state.getAccumulators();
+ Map<String, Accumulator<?, ?>> accumulators = null;
+ if (serializedAccumulators != null) {
+ try {
+ accumulators = serializedAccumulators.deserializeUserAccumulators(userClassLoader);
+ } catch (Exception e) {
+ LOG.error("Failed to deserialize final accumulator results.", e);
+ }
+ }
+ return accumulators;
+ }
+
/**
* Schedule or updates consumers of the given result partition.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/821da81f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index e0472ba..82c376e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -54,9 +54,16 @@ public class IOMetrics implements Serializable {
}
public IOMetrics(
- int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut,
- double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond,
- double numRecordsInPerSecond, double numRecordsOutPerSecond) {
+ int numBytesInLocal,
+ int numBytesInRemote,
+ int numBytesOut,
+ int numRecordsIn,
+ int numRecordsOut,
+ double numBytesInLocalPerSecond,
+ double numBytesInRemotePerSecond,
+ double numBytesOutPerSecond,
+ double numRecordsInPerSecond,
+ double numRecordsOutPerSecond) {
this.numBytesInLocal = numBytesInLocal;
this.numBytesInRemote = numBytesInRemote;
this.numBytesOut = numBytesOut;
http://git-wip-us.apache.org/repos/asf/flink/blob/821da81f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index f119671..3d2913f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -28,12 +28,17 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -160,7 +165,7 @@ public class ExecutionGraphDeploymentTest {
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);
- Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7650, v2, 2350);
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7650, v2, 2350).f1;
for (Execution e : executions.values()) {
e.markFinished();
@@ -184,7 +189,7 @@ public class ExecutionGraphDeploymentTest {
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);
- Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6).f1;
for (Execution e : executions.values()) {
e.markFailed(null);
@@ -208,7 +213,7 @@ public class ExecutionGraphDeploymentTest {
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);
- Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6).f1;
for (Execution e : executions.values()) {
e.fail(null);
@@ -222,6 +227,85 @@ public class ExecutionGraphDeploymentTest {
}
}
+ /**
+ * Verifies that {@link ExecutionGraph#updateState(TaskExecutionState)} updates the accumulators and metrics for an
+ * execution that failed or was canceled.
+ */
+ @Test
+ public void testAccumulatorsAndMetricsForwarding() throws Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+
+ Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphAndExecutions = setupExecution(v1, 1, v2, 1);
+ ExecutionGraph graph = graphAndExecutions.f0;
+
+ // verify behavior for canceled executions
+ Execution execution1 = graphAndExecutions.f1.values().iterator().next();
+
+ IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
+ Map<String, Accumulator<?, ?>> accumulators = new HashMap<>();
+ accumulators.put("acc", new IntCounter(4));
+ AccumulatorSnapshot accumulatorSnapshot = new AccumulatorSnapshot(graph.getJobID(), execution1.getAttemptId(), accumulators);
+
+ TaskExecutionState state = new TaskExecutionState(graph.getJobID(), execution1.getAttemptId(), ExecutionState.CANCELED, null, accumulatorSnapshot, ioMetrics);
+
+ graph.updateState(state);
+
+ assertEquals(ioMetrics, execution1.getIOMetrics());
+ assertNotNull(execution1.getUserAccumulators());
+ assertEquals(4, execution1.getUserAccumulators().get("acc").getLocalValue());
+
+ // verify behavior for failed executions
+ Execution execution2 = graphAndExecutions.f1.values().iterator().next();
+
+ IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
+ Map<String, Accumulator<?, ?>> accumulators2 = new HashMap<>();
+ accumulators2.put("acc", new IntCounter(8));
+ AccumulatorSnapshot accumulatorSnapshot2 = new AccumulatorSnapshot(graph.getJobID(), execution2.getAttemptId(), accumulators2);
+
+ TaskExecutionState state2 = new TaskExecutionState(graph.getJobID(), execution2.getAttemptId(), ExecutionState.FAILED, null, accumulatorSnapshot2, ioMetrics2);
+
+ graph.updateState(state2);
+
+ assertEquals(ioMetrics2, execution2.getIOMetrics());
+ assertNotNull(execution2.getUserAccumulators());
+ assertEquals(8, execution2.getUserAccumulators().get("acc").getLocalValue());
+ }
+
+ /**
+ * Verifies that {@link Execution#cancelingComplete(Map, IOMetrics)} and {@link Execution#markFailed(Throwable, Map, IOMetrics)}
+ * store the given accumulators and metrics correctly.
+ */
+ @Test
+ public void testAccumulatorsAndMetricsStorage() throws Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 1, v2, 1).f1;
+
+ IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
+ Map<String, Accumulator<?, ?>> accumulators = Collections.emptyMap();
+
+ Execution execution1 = executions.values().iterator().next();
+ execution1.cancel();
+ execution1.cancelingComplete(accumulators, ioMetrics);
+
+ assertEquals(ioMetrics, execution1.getIOMetrics());
+ assertEquals(accumulators, execution1.getUserAccumulators());
+
+ Execution execution2 = executions.values().iterator().next();
+ execution2.markFailed(new Throwable(), accumulators, ioMetrics);
+
+ assertEquals(ioMetrics, execution2.getIOMetrics());
+ assertEquals(accumulators, execution2.getUserAccumulators());
+ }
+
@Test
public void testRegistrationOfExecutionsCanceled() {
try {
@@ -232,7 +316,7 @@ public class ExecutionGraphDeploymentTest {
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);
- Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 19, v2, 37);
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 19, v2, 37).f1;
for (Execution e : executions.values()) {
e.cancel();
@@ -257,7 +341,7 @@ public class ExecutionGraphDeploymentTest {
JobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);
- Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4);
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4).f1;
List<Execution> execList = new ArrayList<Execution>();
execList.addAll(executions.values());
@@ -351,7 +435,7 @@ public class ExecutionGraphDeploymentTest {
assertEquals(JobStatus.FAILED, eg.getState());
}
- private Map<ExecutionAttemptID, Execution> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
+ private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
final JobID jobId = new JobID();
v1.setParallelism(dop1);
@@ -394,7 +478,7 @@ public class ExecutionGraphDeploymentTest {
Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
assertEquals(dop1 + dop2, executions.size());
- return executions;
+ return new Tuple2<>(eg, executions);
}
@SuppressWarnings("serial")