You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/30 16:57:41 UTC

flink git commit: [cleanup] move updateAccumulators method to a more sensible location

Repository: flink
Updated Branches:
  refs/heads/master 83e14cb15 -> 1919ae735


[cleanup] move updateAccumulators method to a more sensible location


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1919ae73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1919ae73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1919ae73

Branch: refs/heads/master
Commit: 1919ae735ca557dad5f05bf787599b76fe80928f
Parents: 83e14cb
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Jul 30 16:49:01 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 16:57:05 2015 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 48 ++++++++++----------
 1 file changed, 24 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1919ae73/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 833518c..be92bd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -137,30 +137,6 @@ public class ExecutionGraph implements Serializable {
 	/** The currently executed tasks, for callbacks */
 	private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
 
-	/**
-	 * Updates the accumulators during the runtime of a job. Final accumulator results are transferred
-	 * through the UpdateTaskExecutionState message.
-	 * @param accumulatorSnapshot The serialized flink and user-defined accumulators
-	 */
-	public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
-		Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
-		Map<String, Accumulator<?, ?>> userAccumulators;
-		try {
-			flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators();
-			userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
-
-			ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
-			Execution execution = currentExecutions.get(execID);
-			if (execution != null) {
-				execution.setAccumulators(flinkAccumulators, userAccumulators);
-			} else {
-				LOG.warn("Received accumulator result for unknown execution {}.", execID);
-			}
-		} catch (Exception e) {
-			LOG.error("Cannot update accumulators for job " + jobID, e);
-		}
-	}
-
 	/** A list of all libraries required during the job execution. Libraries have to be stored
 	 * inside the BlobService and are referenced via the BLOB keys. */
 	private final List<BlobKey> requiredJarFiles;
@@ -1007,6 +983,30 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
+	/**
+	 * Updates the accumulators during the runtime of a job. Final accumulator results are transferred
+	 * through the UpdateTaskExecutionState message.
+	 * @param accumulatorSnapshot The serialized flink and user-defined accumulators
+	 */
+	public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+		Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
+		Map<String, Accumulator<?, ?>> userAccumulators;
+		try {
+			flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators();
+			userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
+
+			ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
+			Execution execution = currentExecutions.get(execID);
+			if (execution != null) {
+				execution.setAccumulators(flinkAccumulators, userAccumulators);
+			} else {
+				LOG.warn("Received accumulator result for unknown execution {}.", execID);
+			}
+		} catch (Exception e) {
+			LOG.error("Cannot update accumulators for job " + jobID, e);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------