You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/30 16:57:41 UTC
flink git commit: [cleanup] move updateAccumulators method to a more
sensible location
Repository: flink
Updated Branches:
refs/heads/master 83e14cb15 -> 1919ae735
[cleanup] move updateAccumulators method to a more sensible location
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1919ae73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1919ae73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1919ae73
Branch: refs/heads/master
Commit: 1919ae735ca557dad5f05bf787599b76fe80928f
Parents: 83e14cb
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Jul 30 16:49:01 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 16:57:05 2015 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 48 ++++++++++----------
1 file changed, 24 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1919ae73/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 833518c..be92bd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -137,30 +137,6 @@ public class ExecutionGraph implements Serializable {
/** The currently executed tasks, for callbacks */
private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
- /**
- * Updates the accumulators during the runtime of a job. Final accumulator results are transferred
- * through the UpdateTaskExecutionState message.
- * @param accumulatorSnapshot The serialized flink and user-defined accumulators
- */
- public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
- Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
- Map<String, Accumulator<?, ?>> userAccumulators;
- try {
- flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators();
- userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
-
- ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
- Execution execution = currentExecutions.get(execID);
- if (execution != null) {
- execution.setAccumulators(flinkAccumulators, userAccumulators);
- } else {
- LOG.warn("Received accumulator result for unknown execution {}.", execID);
- }
- } catch (Exception e) {
- LOG.error("Cannot update accumulators for job " + jobID, e);
- }
- }
-
/** A list of all libraries required during the job execution. Libraries have to be stored
* inside the BlobService and are referenced via the BLOB keys. */
private final List<BlobKey> requiredJarFiles;
@@ -1007,6 +983,30 @@ public class ExecutionGraph implements Serializable {
}
}
+ /**
+ * Updates the accumulators during the runtime of a job. Final accumulator results are transferred
+ * through the UpdateTaskExecutionState message.
+ * @param accumulatorSnapshot The serialized flink and user-defined accumulators
+ */
+ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+ Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
+ Map<String, Accumulator<?, ?>> userAccumulators;
+ try {
+ flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators();
+ userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
+
+ ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
+ Execution execution = currentExecutions.get(execID);
+ if (execution != null) {
+ execution.setAccumulators(flinkAccumulators, userAccumulators);
+ } else {
+ LOG.warn("Received accumulator result for unknown execution {}.", execID);
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot update accumulators for job " + jobID, e);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Listeners & Observers
// --------------------------------------------------------------------------------------------