You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/08/28 12:36:31 UTC
[1/2] flink git commit: [hotfix] fix missing decorations for
accumulator messages
Repository: flink
Updated Branches:
refs/heads/master 91069d1f7 -> cf225f0f9
[hotfix] fix missing decorations for accumulator messages
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9947b907
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9947b907
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9947b907
Branch: refs/heads/master
Commit: 9947b907806c17575b1d49a000a02f25c3b57118
Parents: 91069d1
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Aug 27 10:11:40 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Aug 28 12:33:38 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/jobmanager/JobManager.scala | 4 +++-
.../apache/flink/runtime/jobmanager/MemoryArchivist.scala | 10 +++++-----
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9947b907/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 92688fa..d001d5a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -783,7 +783,9 @@ class JobManager(
currentJobs.get(jobId) match {
case Some((graph, jobInfo)) =>
val stringifiedAccumulators = graph.getAccumulatorResultsStringified()
- sender() ! AccumulatorResultStringsFound(jobId, stringifiedAccumulators)
+ sender() ! decorateMessage(
+ AccumulatorResultStringsFound(jobId, stringifiedAccumulators)
+ )
case None =>
archive.forward(message)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9947b907/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 9f228ed..e2891de 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -133,23 +133,23 @@ class MemoryArchivist(private val max_entries: Int)
graphs.get(jobID) match {
case Some(graph) =>
val accumulatorValues = graph.getAccumulatorsSerialized()
- sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
+ sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues))
case None =>
- sender() ! AccumulatorResultsNotFound(jobID)
+ sender() ! decorateMessage(AccumulatorResultsNotFound(jobID))
}
} catch {
case e: Exception =>
log.error("Cannot serialize accumulator result.", e)
- sender() ! AccumulatorResultsErroneous(jobID, e)
+ sender() ! decorateMessage(AccumulatorResultsErroneous(jobID, e))
}
case RequestAccumulatorResultsStringified(jobID) =>
graphs.get(jobID) match {
case Some(graph) =>
val accumulatorValues = graph.getAccumulatorResultsStringified()
- sender() ! AccumulatorResultStringsFound(jobID, accumulatorValues)
+ sender() ! decorateMessage(AccumulatorResultStringsFound(jobID, accumulatorValues))
case None =>
- sender() ! AccumulatorResultsNotFound(jobID)
+ sender() ! decorateMessage(AccumulatorResultsNotFound(jobID))
}
}
[2/2] flink git commit: [hotfix] disable object reuse in merging of
accumulators
Posted by mx...@apache.org.
[hotfix] disable object reuse in merging of accumulators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf225f0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf225f0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf225f0f
Branch: refs/heads/master
Commit: cf225f0f96bc3682d2886212eb5b7b4c8c64c0b7
Parents: 9947b90
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Aug 28 12:29:56 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Aug 28 12:34:06 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/accumulators/AccumulatorHelper.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf225f0f/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 3907004..4fa173c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -38,14 +38,14 @@ public class AccumulatorHelper {
for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
Accumulator<?, ?> ownAccumulator = target.get(otherEntry.getKey());
if (ownAccumulator == null) {
- // Take over counter from chained task
- target.put(otherEntry.getKey(), otherEntry.getValue());
+ // Create initial counter (copy!)
+ target.put(otherEntry.getKey(), otherEntry.getValue().clone());
}
else {
// Both should have the same type
AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
ownAccumulator.getClass(), otherEntry.getValue().getClass());
- // Merge counter from chained task into counter from stub
+ // Merge target counter with other counter
mergeSingle(ownAccumulator, otherEntry.getValue());
}
}