You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/27 01:52:09 UTC

spark git commit: [SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator

Repository: spark
Updated Branches:
  refs/heads/master 561d31d2f -> 5af53ada6


[SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator

https://issues.apache.org/jira/browse/SPARK-3628

In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive

In this patch, I changed the way for the DAGScheduler to update the accumulator,

DAGScheduler maintains a HashTable, mapping the stage id to the received <accumulator_id , value> pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the <accumulator_id , value> pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt...

Author: CodingCat <zh...@gmail.com>

Closes #2524 from CodingCat/SPARK-732-1 and squashes the following commits:

701a1e8 [CodingCat] roll back change on Accumulator.scala
1433e6f [CodingCat] make MIMA happy
b233737 [CodingCat] address Matei's comments
02261b8 [CodingCat] rollback  some changes
6b0aff9 [CodingCat] update document
2b2e8cf [CodingCat] updateAccumulator
83b75f8 [CodingCat] style fix
84570d2 [CodingCat] re-enable  the bad accumulator guard
1e9e14d [CodingCat] add NPE guard
21b6840 [CodingCat] simplify the patch
88d1f03 [CodingCat] fix rebase error
f74266b [CodingCat] add test case for resubmitted result stage
5cf586f [CodingCat] de-duplicate on task level
138f9b3 [CodingCat] make MIMA happy
67593d2 [CodingCat] make if allowing duplicate update as an option of accumulator


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5af53ada
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5af53ada
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5af53ada

Branch: refs/heads/master
Commit: 5af53ada65f62e6b5987eada288fb48e9211ef9d
Parents: 561d31d
Author: CodingCat <zh...@gmail.com>
Authored: Wed Nov 26 16:52:04 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Wed Nov 26 16:52:04 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |  4 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 53 +++++++++++---------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 34 ++++++++++---
 docs/programming-guide.md                       |  6 +++
 4 files changed, 67 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5af53ada/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index dc1e8f6..000bbd6 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.io.{ObjectInputStream, Serializable}
+import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.generic.Growable
 import scala.collection.mutable.Map
@@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
  */
 class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
     extends Accumulable[T,T](initialValue, param, name) {
+
   def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
 }
 
@@ -282,7 +284,7 @@ private object Accumulators {
   val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
   var lastId: Long = 0
 
-  def newId: Long = synchronized {
+  def newId(): Long = synchronized {
     lastId += 1
     lastId
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5af53ada/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b1222af..cb8ccfb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -449,7 +449,6 @@ class DAGScheduler(
               }
               // data structures based on StageId
               stageIdToStage -= stageId
-
               logDebug("After removal of stage %d, remaining stages = %d"
                 .format(stageId, stageIdToStage.size))
             }
@@ -902,6 +901,34 @@ class DAGScheduler(
     }
   }
 
+  /** Merge updates from a task to our local accumulator values */
+  private def updateAccumulators(event: CompletionEvent): Unit = {
+    val task = event.task
+    val stage = stageIdToStage(task.stageId)
+    if (event.accumUpdates != null) {
+      try {
+        Accumulators.add(event.accumUpdates)
+        event.accumUpdates.foreach { case (id, partialValue) =>
+          val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
+          // To avoid UI cruft, ignore cases where value wasn't updated
+          if (acc.name.isDefined && partialValue != acc.zero) {
+            val name = acc.name.get
+            val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
+            val stringValue = Accumulators.stringifyValue(acc.value)
+            stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
+            event.taskInfo.accumulables +=
+              AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
+          }
+        }
+      } catch {
+        // If we see an exception during accumulator update, just log the
+        // error and move on.
+        case e: Exception =>
+          logError(s"Failed to update accumulators for $task", e)
+      }
+    }
+  }
+
   /**
    * Responds to a task finishing. This is called inside the event loop so it assumes that it can
    * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
@@ -942,27 +969,6 @@ class DAGScheduler(
     }
     event.reason match {
       case Success =>
-        if (event.accumUpdates != null) {
-          try {
-            Accumulators.add(event.accumUpdates)
-            event.accumUpdates.foreach { case (id, partialValue) =>
-              val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
-              // To avoid UI cruft, ignore cases where value wasn't updated
-              if (acc.name.isDefined && partialValue != acc.zero) {
-                val name = acc.name.get
-                val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
-                val stringValue = Accumulators.stringifyValue(acc.value)
-                stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
-                event.taskInfo.accumulables +=
-                  AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
-              }
-            }
-          } catch {
-            // If we see an exception during accumulator update, just log the error and move on.
-            case e: Exception =>
-              logError(s"Failed to update accumulators for $task", e)
-          }
-        }
         listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
           event.reason, event.taskInfo, event.taskMetrics))
         stage.pendingTasks -= task
@@ -971,6 +977,7 @@ class DAGScheduler(
             stage.resultOfJob match {
               case Some(job) =>
                 if (!job.finished(rt.outputId)) {
+                  updateAccumulators(event)
                   job.finished(rt.outputId) = true
                   job.numFinished += 1
                   // If the whole job has finished, remove it
@@ -995,6 +1002,7 @@ class DAGScheduler(
             }
 
           case smt: ShuffleMapTask =>
+            updateAccumulators(event)
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
@@ -1083,7 +1091,6 @@ class DAGScheduler(
         }
         failedStages += failedStage
         failedStages += mapStage
-
         // Mark the map whose fetch failed as broken in the map stage
         if (mapId != -1) {
           mapStage.removeOutputLoc(mapId, bmAddress)

http://git-wip-us.apache.org/repos/asf/spark/blob/5af53ada/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 819f956..bdd721d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -207,7 +207,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assert(taskSet.tasks.size >= results.size)
     for ((result, i) <- results.zipWithIndex) {
       if (i < taskSet.tasks.size) {
-        runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null))
+        runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null))
+      }
+    }
+  }
+
+  private def completeWithAccumulator(accumId: Long, taskSet: TaskSet,
+                                      results: Seq[(TaskEndReason, Any)]) {
+    assert(taskSet.tasks.size >= results.size)
+    for ((result, i) <- results.zipWithIndex) {
+      if (i < taskSet.tasks.size) {
+        runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
+          Map[Long, Any]((accumId, 1)), null, null))
       }
     }
   }
@@ -493,17 +504,16 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     runEvent(ExecutorLost("exec-hostA"))
     val newEpoch = mapOutputTracker.getEpoch
     assert(newEpoch > oldEpoch)
-    val noAccum = Map[Long, Any]()
     val taskSet = taskSets(0)
     // should be ignored for being too old
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
     // should work because it's a non-failed host
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
+    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null))
     // should be ignored for being too old
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
     // should work because it's a new epoch
     taskSet.tasks(1).epoch = newEpoch
-    runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
+    runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null))
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
            Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
     complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -728,6 +738,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assert(scheduler.sc.dagScheduler === null)
   }
 
+  test("accumulator not calculated for resubmitted result stage") {
+    //just for register
+    val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam)
+    val finalRdd = new MyRDD(sc, 1, Nil)
+    submit(finalRdd, Array(0))
+    completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+    completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assert(Accumulators.originals(accum.id).value === 1)
+    assertDataStructuresEmpty
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
    * Note that this checks only the host and not the executor ID.

http://git-wip-us.apache.org/repos/asf/spark/blob/5af53ada/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 49f319b..c60de6e 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1306,6 +1306,12 @@ vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
 
 </div>
 
+For accumulator updates performed inside <b>actions only</b>, Spark guarantees that each task's update to the accumulator 
+will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware 
+of that each task's update may be applied more than once if tasks or job stages are re-executed.
+
+
+
 # Deploying to a Cluster
 
 The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org