You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/08/28 12:36:31 UTC

[1/2] flink git commit: [hotfix] fix missing decorations for accumulator messages

Repository: flink
Updated Branches:
  refs/heads/master 91069d1f7 -> cf225f0f9


[hotfix] fix missing decorations for accumulator messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9947b907
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9947b907
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9947b907

Branch: refs/heads/master
Commit: 9947b907806c17575b1d49a000a02f25c3b57118
Parents: 91069d1
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Aug 27 10:11:40 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Aug 28 12:33:38 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/jobmanager/JobManager.scala  |  4 +++-
 .../apache/flink/runtime/jobmanager/MemoryArchivist.scala | 10 +++++-----
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9947b907/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 92688fa..d001d5a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -783,7 +783,9 @@ class JobManager(
           currentJobs.get(jobId) match {
             case Some((graph, jobInfo)) =>
               val stringifiedAccumulators = graph.getAccumulatorResultsStringified()
-              sender() ! AccumulatorResultStringsFound(jobId, stringifiedAccumulators)
+              sender() ! decorateMessage(
+                AccumulatorResultStringsFound(jobId, stringifiedAccumulators)
+              )
             case None =>
               archive.forward(message)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/9947b907/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 9f228ed..e2891de 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -133,23 +133,23 @@ class MemoryArchivist(private val max_entries: Int)
         graphs.get(jobID) match {
           case Some(graph) =>
             val accumulatorValues = graph.getAccumulatorsSerialized()
-            sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
+            sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues))
           case None =>
-            sender() ! AccumulatorResultsNotFound(jobID)
+            sender() ! decorateMessage(AccumulatorResultsNotFound(jobID))
         }
       } catch {
         case e: Exception =>
           log.error("Cannot serialize accumulator result.", e)
-          sender() ! AccumulatorResultsErroneous(jobID, e)
+          sender() ! decorateMessage(AccumulatorResultsErroneous(jobID, e))
       }
 
       case RequestAccumulatorResultsStringified(jobID) =>
         graphs.get(jobID) match {
           case Some(graph) =>
             val accumulatorValues = graph.getAccumulatorResultsStringified()
-            sender() ! AccumulatorResultStringsFound(jobID, accumulatorValues)
+            sender() ! decorateMessage(AccumulatorResultStringsFound(jobID, accumulatorValues))
           case None =>
-            sender() ! AccumulatorResultsNotFound(jobID)
+            sender() ! decorateMessage(AccumulatorResultsNotFound(jobID))
         }
   }
 


[2/2] flink git commit: [hotfix] disable object reuse in merging of accumulators

Posted by mx...@apache.org.
[hotfix] disable object reuse in merging of accumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf225f0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf225f0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf225f0f

Branch: refs/heads/master
Commit: cf225f0f96bc3682d2886212eb5b7b4c8c64c0b7
Parents: 9947b90
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Aug 28 12:29:56 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Aug 28 12:34:06 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/accumulators/AccumulatorHelper.java       | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf225f0f/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 3907004..4fa173c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -38,14 +38,14 @@ public class AccumulatorHelper {
 		for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
 			Accumulator<?, ?> ownAccumulator = target.get(otherEntry.getKey());
 			if (ownAccumulator == null) {
-				// Take over counter from chained task
-				target.put(otherEntry.getKey(), otherEntry.getValue());
+				// Create initial counter (copy!)
+				target.put(otherEntry.getKey(), otherEntry.getValue().clone());
 			}
 			else {
 				// Both should have the same type
 				AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
 						ownAccumulator.getClass(), otherEntry.getValue().getClass());
-				// Merge counter from chained task into counter from stub
+				// Merge target counter with other counter
 				mergeSingle(ownAccumulator, otherEntry.getValue());
 			}
 		}