You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/09/30 01:10:10 UTC
spark git commit: [SPARK-25568][CORE] Continue to update the
remaining accumulators when failing to update one accumulator
Repository: spark
Updated Branches:
refs/heads/master f4b138082 -> b6b8a6632
[SPARK-25568][CORE] Continue to update the remaining accumulators when failing to update one accumulator
## What changes were proposed in this pull request?
Since we don't fail a job when `AccumulatorV2.merge` fails, we should try to update the remaining accumulators so that they can still report correct values.
## How was this patch tested?
The new unit test.
Closes #22586 from zsxwing/SPARK-25568.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6b8a663
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6b8a663
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6b8a663
Branch: refs/heads/master
Commit: b6b8a6632e2b6e5482aaf4bfa093700752a9df80
Parents: f4b1380
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Sat Sep 29 18:10:04 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Sep 29 18:10:04 2018 -0700
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 20 ++++++++++++++------
.../spark/scheduler/DAGSchedulerSuite.scala | 20 ++++++++++++++++++++
docs/rdd-programming-guide.md | 4 ++++
3 files changed, 38 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b6b8a663/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4710835..f93d8a8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1245,9 +1245,10 @@ private[spark] class DAGScheduler(
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
- try {
- event.accumUpdates.foreach { updates =>
- val id = updates.id
+
+ event.accumUpdates.foreach { updates =>
+ val id = updates.id
+ try {
// Find the corresponding accumulator on the driver and update it
val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
@@ -1261,10 +1262,17 @@ private[spark] class DAGScheduler(
event.taskInfo.setAccumulables(
acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
}
+ } catch {
+ case NonFatal(e) =>
+ // Log the class name to make it easy to find the bad implementation
+ val accumClassName = AccumulatorContext.get(id) match {
+ case Some(accum) => accum.getClass.getName
+ case None => "Unknown class"
+ }
+ logError(
+ s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}",
+ e)
}
- } catch {
- case NonFatal(e) =>
- logError(s"Failed to update accumulators for task ${task.partitionId}", e)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b6b8a663/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d6c9ae6..b41d2ac 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1880,6 +1880,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(sc.parallelize(1 to 10, 2).count() === 10)
}
+ test("misbehaved accumulator should not impact other accumulators") {
+ val bad = new LongAccumulator {
+ override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit = {
+ throw new DAGSchedulerSuiteDummyException
+ }
+ }
+ sc.register(bad, "bad")
+ val good = sc.longAccumulator("good")
+
+ sc.parallelize(1 to 10, 2).foreach { item =>
+ bad.add(1)
+ good.add(1)
+ }
+
+ // This is to ensure the `bad` accumulator did fail to update its value
+ assert(bad.value == 0L)
+ // Should be able to update the "good" accumulator
+ assert(good.value == 10L)
+ }
+
/**
* The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
* Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.
http://git-wip-us.apache.org/repos/asf/spark/blob/b6b8a663/docs/rdd-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/rdd-programming-guide.md b/docs/rdd-programming-guide.md
index 0054257..9a07d6c 100644
--- a/docs/rdd-programming-guide.md
+++ b/docs/rdd-programming-guide.md
@@ -1465,6 +1465,10 @@ jsc.sc().register(myVectorAcc, "MyVectorAcc1");
Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.
+*Warning*: When a Spark task finishes, Spark will try to merge the accumulated updates in this task to an accumulator.
+If it fails, Spark will ignore the failure and still mark the task successful and continue to run other tasks. Hence,
+a buggy accumulator will not impact a Spark job, but it may not get updated correctly although a Spark job is successful.
+
</div>
<div data-lang="python" markdown="1">
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org