You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/04/14 19:55:04 UTC

spark git commit: [SPARK-14619] Track internal accumulators (metrics) by stage attempt

Repository: spark
Updated Branches:
  refs/heads/master 9fa43a33b -> dac40b68d


[SPARK-14619] Track internal accumulators (metrics) by stage attempt

## What changes were proposed in this pull request?
When there are multiple attempts for a stage, we currently only reset internal accumulator values if all the tasks are resubmitted. It would make more sense to reset the accumulator values for each stage attempt. This will allow us to eventually get rid of the internal flag in the Accumulator class. This is part of my bigger effort to simplify accumulators and task metrics.

## How was this patch tested?
Covered by existing tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #12378 from rxin/SPARK-14619.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dac40b68
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dac40b68
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dac40b68

Branch: refs/heads/master
Commit: dac40b68dc52d5ab855dfde63f0872064aa3d999
Parents: 9fa43a3
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Apr 14 10:54:57 2016 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Apr 14 10:54:57 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/InternalAccumulator.scala   |  2 +-
 .../apache/spark/scheduler/DAGScheduler.scala    | 11 ++---------
 .../scala/org/apache/spark/scheduler/Stage.scala | 19 ++-----------------
 .../org/apache/spark/scheduler/StageInfo.scala   | 10 +++++++++-
 .../scala/org/apache/spark/ui/jobs/JobPage.scala |  2 +-
 .../org/apache/spark/util/JsonProtocol.scala     |  6 ++++--
 .../spark/ExecutorAllocationManagerSuite.scala   |  4 ++--
 .../scala/org/apache/spark/ShuffleSuite.scala    |  6 +++---
 .../spark/scheduler/DAGSchedulerSuite.scala      |  2 +-
 .../sql/execution/UnsafeRowSerializerSuite.scala |  2 +-
 10 files changed, 26 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 7aa9057..0dd4ec6 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -187,7 +187,7 @@ private[spark] object InternalAccumulator {
    * add to the same set of accumulators. We do this to report the distribution of accumulator
    * values across all tasks within each stage.
    */
-  def create(sc: SparkContext): Seq[Accumulator[_]] = {
+  def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
     val accums = createAll()
     accums.foreach { accum =>
       Accumulators.register(accum)

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4609b24..c27aad2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -950,13 +950,6 @@ class DAGScheduler(
     // First figure out the indexes of partition ids to compute.
     val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
 
-    // Create internal accumulators if the stage has no accumulators initialized.
-    // Reset internal accumulators only if this stage is not partially submitted
-    // Otherwise, we may override existing accumulator values from some tasks
-    if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
-      stage.resetInternalAccumulators()
-    }
-
     // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
     // with this Stage
     val properties = jobIdToActiveJob(jobId).properties
@@ -1036,7 +1029,7 @@ class DAGScheduler(
             val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
-              taskBinary, part, locs, stage.internalAccumulators, properties)
+              taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties)
           }
 
         case stage: ResultStage =>
@@ -1046,7 +1039,7 @@ class DAGScheduler(
             val part = stage.rdd.partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId,
-              taskBinary, part, locs, id, properties, stage.internalAccumulators)
+              taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators)
           }
       }
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index a40b700..b6d4e39 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -75,22 +75,6 @@ private[scheduler] abstract class Stage(
   val name: String = callSite.shortForm
   val details: String = callSite.longForm
 
-  private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty
-
-  /** Internal accumulators shared across all tasks in this stage. */
-  def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators
-
-  /**
-   * Re-initialize the internal accumulators associated with this stage.
-   *
-   * This is called every time the stage is submitted, *except* when a subset of tasks
-   * belonging to this stage has already finished. Otherwise, reinitializing the internal
-   * accumulators here again will override partial values from the finished tasks.
-   */
-  def resetInternalAccumulators(): Unit = {
-    _internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
-  }
-
   /**
    * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
    * here, before any attempts have actually been created, because the DAGScheduler uses this
@@ -127,7 +111,8 @@ private[scheduler] abstract class Stage(
       numPartitionsToCompute: Int,
       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
     _latestInfo = StageInfo.fromStage(
-      this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)
+      this, nextAttemptId, Some(numPartitionsToCompute),
+      InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences)
     nextAttemptId += 1
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 24796c1..0fd58c4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
 
 import scala.collection.mutable.HashMap
 
+import org.apache.spark.Accumulator
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.storage.RDDInfo
 
@@ -35,6 +36,7 @@ class StageInfo(
     val rddInfos: Seq[RDDInfo],
     val parentIds: Seq[Int],
     val details: String,
+    val internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
   var submissionTime: Option[Long] = None
@@ -42,7 +44,11 @@ class StageInfo(
   var completionTime: Option[Long] = None
   /** If the stage failed, the reason why. */
   var failureReason: Option[String] = None
-  /** Terminal values of accumulables updated during this stage. */
+
+  /**
+   * Terminal values of accumulables updated during this stage, including all the user-defined
+   * accumulators.
+   */
   val accumulables = HashMap[Long, AccumulableInfo]()
 
   def stageFailed(reason: String) {
@@ -75,6 +81,7 @@ private[spark] object StageInfo {
       stage: Stage,
       attemptId: Int,
       numTasks: Option[Int] = None,
+      internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
     ): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
@@ -87,6 +94,7 @@ private[spark] object StageInfo {
       rddInfos,
       stage.parents.map(_.id),
       stage.details,
+      internalAccumulators,
       taskLocalityPreferences)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 645e2d2..bd4797a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
         // This could be empty if the JobProgressListener hasn't received information about the
         // stage or if the stage information has been garbage collected
         listener.stageIdToInfo.getOrElse(stageId,
-          new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
+          new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty))
       }
 
       val activeStages = Buffer[StageInfo]()

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 09d9553..3b78458 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -578,7 +578,9 @@ private[spark] object JsonProtocol {
     // The "Stage Infos" field was added in Spark 1.2.0
     val stageInfos = Utils.jsonOption(json \ "Stage Infos")
       .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
-        stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
+        stageIds.map { id =>
+          new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty)
+        }
       }
     SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
   }
@@ -686,7 +688,7 @@ private[spark] object JsonProtocol {
     }
 
     val stageInfo = new StageInfo(
-      stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
+      stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 80a1de6..ee6b991 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -928,8 +928,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
       numTasks: Int,
       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
     ): StageInfo = {
-    new StageInfo(
-      stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", taskLocalityPreferences)
+    new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details",
+      Seq.empty, taskLocalityPreferences)
   }
 
   private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 00f3f15..cd7d2e1 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -337,7 +337,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
     // first attempt -- its successful
     val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
       new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem,
-        InternalAccumulator.create(sc)))
+        InternalAccumulator.createAll(sc)))
     val data1 = (1 to 10).map { x => x -> x}
 
     // second attempt -- also successful.  We'll write out different data,
@@ -345,7 +345,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
     // depending on what gets spilled, what gets combined, etc.
     val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0,
       new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem,
-        InternalAccumulator.create(sc)))
+        InternalAccumulator.createAll(sc)))
     val data2 = (11 to 20).map { x => x -> x}
 
     // interleave writes of both attempts -- we want to test that both attempts can occur
@@ -374,7 +374,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
 
     val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
       new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem,
-        InternalAccumulator.create(sc)))
+        InternalAccumulator.createAll(sc)))
     val readData = reader.read().toIndexedSeq
     assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2293c11..fd96fb0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1144,7 +1144,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     // SPARK-9809 -- this stage is submitted without a task for each partition (because some of
     // the shuffle map output is still available from stage 0); make sure we've still got internal
     // accumulators setup
-    assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
+    assert(scheduler.stageIdToStage(2).latestInfo.internalAccumulators.nonEmpty)
     completeShuffleMapStageSuccessfully(2, 0, 2)
     completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
     assert(results === Map(0 -> 1234, 1 -> 1235))

http://git-wip-us.apache.org/repos/asf/spark/blob/dac40b68/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 7db1f96..0168787 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -114,7 +114,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
       }
       val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0)
       val taskContext = new TaskContextImpl(
-        0, 0, 0, 0, taskMemoryManager, new Properties, null, InternalAccumulator.create(sc))
+        0, 0, 0, 0, taskMemoryManager, new Properties, null, InternalAccumulator.createAll(sc))
 
       val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
         taskContext,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org