You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/02/20 00:57:41 UTC

spark git commit: [SPARK-13407] Guard against garbage-collected accumulators in TaskMetrics.fromAccumulatorUpdates

Repository: spark
Updated Branches:
  refs/heads/master 091f6a783 -> 983fa2d62


[SPARK-13407] Guard against garbage-collected accumulators in TaskMetrics.fromAccumulatorUpdates

`TaskMetrics.fromAccumulatorUpdates()` can fail if accumulators have been garbage-collected on the driver. To guard against this, this patch introduces `ListenerTaskMetrics`, a subclass of `TaskMetrics` which is used only in `TaskMetrics.fromAccumulatorUpdates()` and which eliminates the need to access the original accumulators on the driver.

Author: Josh Rosen <jo...@databricks.com>

Closes #11276 from JoshRosen/accum-updates-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/983fa2d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/983fa2d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/983fa2d6

Branch: refs/heads/master
Commit: 983fa2d62029e7334fb661cb65c8cadaa4b86d1c
Parents: 091f6a7
Author: Josh Rosen <jo...@databricks.com>
Authored: Fri Feb 19 15:57:23 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Fri Feb 19 15:57:23 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala | 55 +++++++++++---------
 .../spark/executor/TaskMetricsSuite.scala       | 10 ++--
 2 files changed, 33 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/983fa2d6/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 8ff0620..9da9cb5 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -364,6 +364,27 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
 
 }
 
+/**
+ * Internal subclass of [[TaskMetrics]] which is used only for posting events to listeners.
+ * Its purpose is to obviate the need for the driver to reconstruct the original accumulators,
+ * which might have been garbage-collected. See SPARK-13407 for more details.
+ *
+ * Instances of this class should be considered read-only and users should not call `inc*()` or
+ * `set*()` methods. While we could override the setter methods to throw
+ * UnsupportedOperationException, we choose not to do so because the overrides would quickly become
+ * out-of-date when new metrics are added.
+ */
+private[spark] class ListenerTaskMetrics(
+    initialAccums: Seq[Accumulator[_]],
+    accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) {
+
+  override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates
+
+  override private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
+    throw new UnsupportedOperationException("This TaskMetrics is read-only")
+  }
+}
+
 private[spark] object TaskMetrics extends Logging {
 
   def empty: TaskMetrics = new TaskMetrics
@@ -397,33 +418,15 @@ private[spark] object TaskMetrics extends Logging {
     // Initial accumulators are passed into the TaskMetrics constructor first because these
     // are required to be uniquely named. The rest of the accumulators from this task are
     // registered later because they need not satisfy this requirement.
-    val (initialAccumInfos, otherAccumInfos) = accumUpdates
-      .filter { info => info.update.isDefined }
-      .partition { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
-    val initialAccums = initialAccumInfos.map { info =>
-      val accum = InternalAccumulator.create(info.name.get)
-      accum.setValueAny(info.update.get)
-      accum
-    }
-    // We don't know the types of the rest of the accumulators, so we try to find the same ones
-    // that were previously registered here on the driver and make copies of them. It is important
-    // that we copy the accumulators here since they are used across many tasks and we want to
-    // maintain a snapshot of their local task values when we post them to listeners downstream.
-    val otherAccums = otherAccumInfos.flatMap { info =>
-      val id = info.id
-      val acc = Accumulators.get(id).map { a =>
-        val newAcc = a.copy()
-        newAcc.setValueAny(info.update.get)
-        newAcc
+    val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined }
+    val initialAccums = definedAccumUpdates
+      .filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
+      .map { info =>
+        val accum = InternalAccumulator.create(info.name.get)
+        accum.setValueAny(info.update.get)
+        accum
       }
-      if (acc.isEmpty) {
-        logWarning(s"encountered unregistered accumulator $id when reconstructing task metrics.")
-      }
-      acc
-    }
-    val metrics = new TaskMetrics(initialAccums)
-    otherAccums.foreach(metrics.registerAccumulator)
-    metrics
+    new ListenerTaskMetrics(initialAccums, definedAccumUpdates)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/983fa2d6/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 3a1a67c..d91f50f 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -475,10 +475,9 @@ class TaskMetricsSuite extends SparkFunSuite {
     }
     val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
     assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
-    // Test this with additional accumulators. Only the ones registered with `Accumulators`
-    // will show up in the reconstructed TaskMetrics. In practice, all accumulators created
+    // Test this with additional accumulators to ensure that we do not crash when handling
+    // updates from unregistered accumulators. In practice, all accumulators created
     // on the driver, internal or not, should be registered with `Accumulators` at some point.
-    // Here we show that reconstruction will succeed even if there are unregistered accumulators.
     val param = IntAccumulatorParam
     val registeredAccums = Seq(
       new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true),
@@ -497,9 +496,8 @@ class TaskMetricsSuite extends SparkFunSuite {
     val registeredAccumInfos = registeredAccums.map(makeInfo)
     val unregisteredAccumInfos = unregisteredAccums.map(makeInfo)
     val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos
-    val metrics2 = TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
-    // accumulators that were not registered with `Accumulators` will not show up
-    assertUpdatesEquals(metrics2.accumulatorUpdates(), accumUpdates1 ++ registeredAccumInfos)
+    // Simply checking that this does not crash:
+    TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org