You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/10/18 13:57:41 UTC
[1/2] flink git commit: [FLINK-4829] snapshot accumulators on a
best-effort basis
Repository: flink
Updated Branches:
refs/heads/master f46ca3918 -> d95929e01
[FLINK-4829] snapshot accumulators on a best-effort basis
Heartbeats should not fail when accumulators could not be snapshotted. Instead,
we should simply skip the reporting of the failed accumulator. Eventually, the
accumulator will be reported; at the latest, when the job finishes.
This closes #2649
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d95929e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d95929e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d95929e0
Branch: refs/heads/master
Commit: d95929e0110b53f03452e1ad453de2522f79a6b8
Parents: 783dca5
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Oct 17 14:19:00 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 18 15:55:33 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/taskmanager/TaskManager.scala | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d95929e0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f8f333e..1017ea0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -35,6 +35,7 @@ import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet,
import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
@@ -1335,9 +1336,15 @@ class TaskManager(
runningTasks.asScala foreach {
case (execID, task) =>
- val registry = task.getAccumulatorRegistry
- val accumulators = registry.getSnapshot
- accumulatorEvents.append(accumulators)
+ try {
+ val registry = task.getAccumulatorRegistry
+ val accumulators = registry.getSnapshot
+ accumulatorEvents.append(accumulators)
+ } catch {
+ case e: Exception =>
+ log.warn("Failed to take accumulator snapshot for task {}.",
+ execID, ExceptionUtils.getRootCause(e))
+ }
}
currentJobManager foreach {
[2/2] flink git commit: [FLINK-4829] protect user accumulators
against concurrent updates
Posted by mx...@apache.org.
[FLINK-4829] protect user accumulators against concurrent updates
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/783dca56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/783dca56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/783dca56
Branch: refs/heads/master
Commit: 783dca56eedc95f0a8974a9b50f2b532ca8cf849
Parents: f46ca39
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Oct 14 15:15:50 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 18 15:55:33 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/accumulators/AccumulatorRegistry.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/783dca56/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
index 41af2a9..44714e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
@@ -44,7 +45,8 @@ public class AccumulatorRegistry {
new HashMap<Metric, Accumulator<?, ?>>();
/* User-defined Accumulator values stored for the executing task. */
- private final Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
+ private final Map<String, Accumulator<?, ?>> userAccumulators =
+ new ConcurrentHashMap<>(4);
/* The reporter reference that is handed to the reporting tasks. */
private final ReadWriteReporter reporter;