You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/07 11:42:41 UTC

flink git commit: [FLINK-5645] Store accumulators/metrics for canceled/failed tasks

Repository: flink
Updated Branches:
  refs/heads/master 746c1efa5 -> 821da81fe


[FLINK-5645] Store accumulators/metrics for canceled/failed tasks

This closes #3377.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/821da81f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/821da81f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/821da81f

Branch: refs/heads/master
Commit: 821da81fe6bee4c5a33be19c08064491fd6280de
Parents: 746c1ef
Author: zentol <ch...@apache.org>
Authored: Tue Feb 21 12:36:17 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Mar 7 10:20:43 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 37 ++++++--
 .../runtime/executiongraph/ExecutionGraph.java  | 23 ++++-
 .../flink/runtime/executiongraph/IOMetrics.java | 13 ++-
 .../ExecutionGraphDeploymentTest.java           | 98 ++++++++++++++++++--
 4 files changed, 150 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/821da81f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 3191d76..e17a3e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -706,6 +706,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		processFail(t, true);
 	}
 
+	void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+		processFail(t, true, userAccumulators, metrics);
+	}
+
 	void markFinished() {
 		markFinished(null, null);
 	}
@@ -731,10 +735,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 							}
 						}
 
-						synchronized (accumulatorLock) {
-							this.userAccumulators = userAccumulators;
-						}
-						this.ioMetrics = metrics;
+						updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 						assignedResource.releaseSlot();
 						vertex.getExecutionGraph().deregisterExecution(this);
@@ -748,7 +749,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			else if (current == CANCELING) {
 				// we sent a cancel call, and the task manager finished before it arrived. We
 				// will never get a CANCELED call back from the job manager
-				cancelingComplete();
+				cancelingComplete(userAccumulators, metrics);
 				return;
 			}
 			else if (current == CANCELED || current == FAILED) {
@@ -766,6 +767,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	void cancelingComplete() {
+		cancelingComplete(null, null);
+	}
+	
+	void cancelingComplete(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
 
 		// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
 		// network stack is canceled (for example by a failing / canceling receiver or sender
@@ -779,6 +784,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				return;
 			}
 			else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
+
+				updateAccumulatorsAndMetrics(userAccumulators, metrics);
+
 				if (transitionState(current, CANCELED)) {
 					try {
 						assignedResource.releaseSlot();
@@ -834,6 +842,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	// --------------------------------------------------------------------------------------------
 
 	private boolean processFail(Throwable t, boolean isCallback) {
+		return processFail(t, isCallback, null, null);
+	}
+
+	private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
 
 		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
 		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
@@ -857,7 +869,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			}
 
 			if (current == CANCELING) {
-				cancelingComplete();
+				cancelingComplete(userAccumulators, metrics);
 				return false;
 			}
 
@@ -865,6 +877,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				// success (in a manner of speaking)
 				this.failureCause = t;
 
+				updateAccumulatorsAndMetrics(userAccumulators, metrics);
+
 				try {
 					if (assignedResource != null) {
 						assignedResource.releaseSlot();
@@ -1093,6 +1107,17 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		return ioMetrics;
 	}
 
+	private void updateAccumulatorsAndMetrics(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+		if (userAccumulators != null) {
+			synchronized (accumulatorLock) {
+				this.userAccumulators = userAccumulators;
+			}
+		}
+		if (metrics != null) {
+			this.ioMetrics = metrics;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Standard utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/821da81f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5fa40fc..6bb3455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1281,9 +1281,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					return attempt.switchToRunning();
 				case FINISHED:
 					try {
-						AccumulatorSnapshot accumulators = state.getAccumulators();
-						Map<String, Accumulator<?, ?>> userAccumulators =
-							accumulators.deserializeUserAccumulators(userClassLoader);
+						Map<String, Accumulator<?, ?>> userAccumulators = deserializeAccumulators(state);
 						attempt.markFinished(userAccumulators, state.getIOMetrics());
 					}
 					catch (Exception e) {
@@ -1292,10 +1290,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					}
 					return true;
 				case CANCELED:
-					attempt.cancelingComplete();
+					Map<String, Accumulator<?, ?>> userAcc1 = deserializeAccumulators(state);
+					attempt.cancelingComplete(userAcc1, state.getIOMetrics());
 					return true;
 				case FAILED:
-					attempt.markFailed(state.getError(userClassLoader));
+					Map<String, Accumulator<?, ?>> userAcc2 = deserializeAccumulators(state);
+					attempt.markFailed(state.getError(userClassLoader), userAcc2, state.getIOMetrics());
 					return true;
 				default:
 					// we mark as failed and return false, which triggers the TaskManager
@@ -1309,6 +1309,19 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
+	private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) {
+		AccumulatorSnapshot serializedAccumulators = state.getAccumulators();
+		Map<String, Accumulator<?, ?>> accumulators = null;
+		if (serializedAccumulators != null) {
+			try {
+				accumulators = serializedAccumulators.deserializeUserAccumulators(userClassLoader);
+			} catch (Exception e) {
+				LOG.error("Failed to deserialize final accumulator results.", e);
+			}
+		}
+		return accumulators;
+	}
+
 	/**
 	 * Schedule or updates consumers of the given result partition.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/821da81f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index e0472ba..82c376e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -54,9 +54,16 @@ public class IOMetrics implements Serializable {
 	}
 
 	public IOMetrics(
-			int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut,
-			double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond,
-			double numRecordsInPerSecond, double numRecordsOutPerSecond) {
+			int numBytesInLocal,
+			int numBytesInRemote,
+			int numBytesOut,
+			int numRecordsIn,
+			int numRecordsOut,
+			double numBytesInLocalPerSecond,
+			double numBytesInRemotePerSecond,
+			double numBytesOutPerSecond,
+			double numRecordsInPerSecond,
+			double numRecordsOutPerSecond) {
 		this.numBytesInLocal = numBytesInLocal;
 		this.numBytesInRemote = numBytesInRemote;
 		this.numBytesOut = numBytesOut;

http://git-wip-us.apache.org/repos/asf/flink/blob/821da81f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index f119671..3d2913f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -28,12 +28,17 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -160,7 +165,7 @@ public class ExecutionGraphDeploymentTest {
 			JobVertex v1 = new JobVertex("v1", jid1);
 			JobVertex v2 = new JobVertex("v2", jid2);
 
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7650, v2, 2350);
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7650, v2, 2350).f1;
 
 			for (Execution e : executions.values()) {
 				e.markFinished();
@@ -184,7 +189,7 @@ public class ExecutionGraphDeploymentTest {
 			JobVertex v1 = new JobVertex("v1", jid1);
 			JobVertex v2 = new JobVertex("v2", jid2);
 
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6).f1;
 
 			for (Execution e : executions.values()) {
 				e.markFailed(null);
@@ -208,7 +213,7 @@ public class ExecutionGraphDeploymentTest {
 			JobVertex v1 = new JobVertex("v1", jid1);
 			JobVertex v2 = new JobVertex("v2", jid2);
 
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6).f1;
 
 			for (Execution e : executions.values()) {
 				e.fail(null);
@@ -222,6 +227,85 @@ public class ExecutionGraphDeploymentTest {
 		}
 	}
 
+	/**
+	 * Verifies that {@link ExecutionGraph#updateState(TaskExecutionState)} updates the accumulators and metrics for an
+	 * execution that failed or was canceled.
+	 */
+	@Test
+	public void testAccumulatorsAndMetricsForwarding() throws Exception {
+		final JobVertexID jid1 = new JobVertexID();
+		final JobVertexID jid2 = new JobVertexID();
+
+		JobVertex v1 = new JobVertex("v1", jid1);
+		JobVertex v2 = new JobVertex("v2", jid2);
+
+		Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphAndExecutions = setupExecution(v1, 1, v2, 1);
+		ExecutionGraph graph = graphAndExecutions.f0;
+		
+		// verify behavior for canceled executions
+		Execution execution1 = graphAndExecutions.f1.values().iterator().next();
+
+		IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
+		Map<String, Accumulator<?, ?>> accumulators = new HashMap<>();
+		accumulators.put("acc", new IntCounter(4));
+		AccumulatorSnapshot accumulatorSnapshot = new AccumulatorSnapshot(graph.getJobID(), execution1.getAttemptId(), accumulators);
+		
+		TaskExecutionState state = new TaskExecutionState(graph.getJobID(), execution1.getAttemptId(), ExecutionState.CANCELED, null, accumulatorSnapshot, ioMetrics);
+		
+		graph.updateState(state);
+		
+		assertEquals(ioMetrics, execution1.getIOMetrics());
+		assertNotNull(execution1.getUserAccumulators());
+		assertEquals(4, execution1.getUserAccumulators().get("acc").getLocalValue());
+		
+		// verify behavior for failed executions
+		Execution execution2 = graphAndExecutions.f1.values().iterator().next();
+
+		IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
+		Map<String, Accumulator<?, ?>> accumulators2 = new HashMap<>();
+		accumulators2.put("acc", new IntCounter(8));
+		AccumulatorSnapshot accumulatorSnapshot2 = new AccumulatorSnapshot(graph.getJobID(), execution2.getAttemptId(), accumulators2);
+
+		TaskExecutionState state2 = new TaskExecutionState(graph.getJobID(), execution2.getAttemptId(), ExecutionState.FAILED, null, accumulatorSnapshot2, ioMetrics2);
+
+		graph.updateState(state2);
+
+		assertEquals(ioMetrics2, execution2.getIOMetrics());
+		assertNotNull(execution2.getUserAccumulators());
+		assertEquals(8, execution2.getUserAccumulators().get("acc").getLocalValue());
+	}
+
+	/**
+	 * Verifies that {@link Execution#cancelingComplete(Map, IOMetrics)} and {@link Execution#markFailed(Throwable, Map, IOMetrics)}
+	 * store the given accumulators and metrics correctly.
+	 */
+	@Test
+	public void testAccumulatorsAndMetricsStorage() throws Exception {
+		final JobVertexID jid1 = new JobVertexID();
+		final JobVertexID jid2 = new JobVertexID();
+
+		JobVertex v1 = new JobVertex("v1", jid1);
+		JobVertex v2 = new JobVertex("v2", jid2);
+
+		Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 1, v2, 1).f1;
+		
+		IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
+		Map<String, Accumulator<?, ?>> accumulators = Collections.emptyMap();
+
+		Execution execution1 = executions.values().iterator().next();
+		execution1.cancel();
+		execution1.cancelingComplete(accumulators, ioMetrics);
+		
+		assertEquals(ioMetrics, execution1.getIOMetrics());
+		assertEquals(accumulators, execution1.getUserAccumulators());
+
+		Execution execution2 = executions.values().iterator().next();
+		execution2.markFailed(new Throwable(), accumulators, ioMetrics);
+
+		assertEquals(ioMetrics, execution2.getIOMetrics());
+		assertEquals(accumulators, execution2.getUserAccumulators());
+	}
+
 	@Test
 	public void testRegistrationOfExecutionsCanceled() {
 		try {
@@ -232,7 +316,7 @@ public class ExecutionGraphDeploymentTest {
 			JobVertex v1 = new JobVertex("v1", jid1);
 			JobVertex v2 = new JobVertex("v2", jid2);
 
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 19, v2, 37);
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 19, v2, 37).f1;
 
 			for (Execution e : executions.values()) {
 				e.cancel();
@@ -257,7 +341,7 @@ public class ExecutionGraphDeploymentTest {
 			JobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
 			JobVertex v2 = new JobVertex("v2", jid2);
 
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4);
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4).f1;
 
 			List<Execution> execList = new ArrayList<Execution>();
 			execList.addAll(executions.values());
@@ -351,7 +435,7 @@ public class ExecutionGraphDeploymentTest {
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
 
-	private Map<ExecutionAttemptID, Execution> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
+	private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
 		final JobID jobId = new JobID();
 
 		v1.setParallelism(dop1);
@@ -394,7 +478,7 @@ public class ExecutionGraphDeploymentTest {
 		Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
 		assertEquals(dop1 + dop2, executions.size());
 
-		return executions;
+		return new Tuple2<>(eg, executions);
 	}
 
 	@SuppressWarnings("serial")