You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/02/09 02:23:43 UTC

spark git commit: [SPARK-10620][SPARK-13054] Minor addendum to #10835

Repository: spark
Updated Branches:
  refs/heads/master ff0af0ddf -> eeaf45b92


[SPARK-10620][SPARK-13054] Minor addendum to #10835

Additional changes to #10835, mainly related to style and visibility. This patch also adds back a few deprecated methods for backward compatibility.

Author: Andrew Or <an...@databricks.com>

Closes #10958 from andrewor14/task-metrics-to-accums-followups.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eeaf45b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eeaf45b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eeaf45b9

Branch: refs/heads/master
Commit: eeaf45b92695c577279f3a17d8c80ee40425e9aa
Parents: ff0af0d
Author: Andrew Or <an...@databricks.com>
Authored: Mon Feb 8 17:23:33 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Feb 8 17:23:33 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulator.scala    | 11 +++----
 .../org/apache/spark/InternalAccumulator.scala  |  4 +--
 .../org/apache/spark/TaskContextImpl.scala      |  2 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |  2 +-
 .../org/apache/spark/executor/Executor.scala    | 16 +++++------
 .../org/apache/spark/executor/TaskMetrics.scala | 20 +++++++++++--
 .../org/apache/spark/scheduler/ResultTask.scala |  2 +-
 .../apache/spark/scheduler/SparkListener.scala  |  1 +
 .../org/apache/spark/ui/jobs/StagePage.scala    |  6 ++--
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 .../org/apache/spark/AccumulatorSuite.scala     |  2 +-
 .../apache/spark/InternalAccumulatorSuite.scala |  6 ++--
 .../spark/executor/TaskMetricsSuite.scala       | 30 ++++++++++----------
 .../spark/scheduler/TaskContextSuite.scala      |  2 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |  2 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  4 +--
 project/MimaExcludes.scala                      |  3 +-
 17 files changed, 66 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/Accumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 558bd44..5e8f1d4 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -60,19 +60,20 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
  * @tparam T result type
  */
 class Accumulator[T] private[spark] (
-    @transient private[spark] val initialValue: T,
+    // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
+    @transient private val initialValue: T,
     param: AccumulatorParam[T],
     name: Option[String],
     internal: Boolean,
-    override val countFailedValues: Boolean = false)
+    private[spark] override val countFailedValues: Boolean = false)
   extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {
 
   def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
-    this(initialValue, param, name, false)
+    this(initialValue, param, name, false /* internal */)
   }
 
   def this(initialValue: T, param: AccumulatorParam[T]) = {
-    this(initialValue, param, None, false)
+    this(initialValue, param, None, false /* internal */)
   }
 }
 
@@ -84,7 +85,7 @@ private[spark] object Accumulators extends Logging {
    * This global map holds the original accumulator objects that are created on the driver.
    * It keeps weak references to these objects so that accumulators can be garbage-collected
    * once the RDDs and user-code that reference them are cleaned up.
-   * TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
+   * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
    */
   @GuardedBy("Accumulators")
   val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index c191122..7aa9057 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -119,7 +119,7 @@ private[spark] object InternalAccumulator {
   /**
    * Accumulators for tracking internal metrics.
    */
-  def create(): Seq[Accumulator[_]] = {
+  def createAll(): Seq[Accumulator[_]] = {
     Seq[String](
       EXECUTOR_DESERIALIZE_TIME,
       EXECUTOR_RUN_TIME,
@@ -188,7 +188,7 @@ private[spark] object InternalAccumulator {
    * values across all tasks within each stage.
    */
   def create(sc: SparkContext): Seq[Accumulator[_]] = {
-    val accums = create()
+    val accums = createAll()
     accums.foreach { accum =>
       Accumulators.register(accum)
       sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 27ca46f..1d228b6 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -32,7 +32,7 @@ private[spark] class TaskContextImpl(
     override val attemptNumber: Int,
     override val taskMemoryManager: TaskMemoryManager,
     @transient private val metricsSystem: MetricsSystem,
-    initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create())
+    initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll())
   extends TaskContext
   with Logging {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 68340cc..c8f201e 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -118,7 +118,7 @@ case class ExceptionFailure(
     description: String,
     stackTrace: Array[StackTraceElement],
     fullStackTrace: String,
-    exceptionWrapper: Option[ThrowableSerializationWrapper],
+    private val exceptionWrapper: Option[ThrowableSerializationWrapper],
     accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
   extends TaskFailedReason {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 51c000e..00be3a2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -300,15 +300,15 @@ private[spark] class Executor(
 
           // Collect latest accumulator values to report back to the driver
           val accumulatorUpdates: Seq[AccumulableInfo] =
-          if (task != null) {
-            task.metrics.foreach { m =>
-              m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
-              m.setJvmGCTime(computeTotalGcTime() - startGCTime)
+            if (task != null) {
+              task.metrics.foreach { m =>
+                m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
+                m.setJvmGCTime(computeTotalGcTime() - startGCTime)
+              }
+              task.collectAccumulatorUpdates(taskFailed = true)
+            } else {
+              Seq.empty[AccumulableInfo]
             }
-            task.collectAccumulatorUpdates(taskFailed = true)
-          } else {
-            Seq.empty[AccumulableInfo]
-          }
 
           val serializedTaskEndReason = {
             try {

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0a6ebcb..8ff0620 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -45,13 +45,12 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
  *                      these requirements.
  */
 @DeveloperApi
-class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
-
+class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable {
   import InternalAccumulator._
 
   // Needed for Java tests
   def this() {
-    this(InternalAccumulator.create())
+    this(InternalAccumulator.createAll())
   }
 
   /**
@@ -144,6 +143,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
     if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
   }
 
+  @deprecated("setting updated blocks is not allowed", "2.0.0")
+  def updatedBlocks_=(blocks: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
+    blocks.foreach(setUpdatedBlockStatuses)
+  }
+
   // Setters and increment-ers
   private[spark] def setExecutorDeserializeTime(v: Long): Unit =
     _executorDeserializeTime.setValue(v)
@@ -220,6 +224,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
    */
   def outputMetrics: Option[OutputMetrics] = _outputMetrics
 
+  @deprecated("setting OutputMetrics is for internal use only", "2.0.0")
+  def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
+    _outputMetrics = om
+  }
+
   /**
    * Get or create a new [[OutputMetrics]] associated with this task.
    */
@@ -296,6 +305,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
    */
   def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
 
+  @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
+  def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
+    _shuffleWriteMetrics = swm
+  }
+
   /**
    * Get or create a new [[ShuffleWriteMetrics]] associated with this task.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 885f70e..cd2736e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -49,7 +49,7 @@ private[spark] class ResultTask[T, U](
     partition: Partition,
     locs: Seq[TaskLocation],
     val outputId: Int,
-    _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create())
+    _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll())
   extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums)
   with Serializable {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 7b09c2e..0a45ef5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -61,6 +61,7 @@ case class SparkListenerTaskEnd(
     taskType: String,
     reason: TaskEndReason,
     taskInfo: TaskInfo,
+    // may be null if the task has failed
     @Nullable taskMetrics: TaskMetrics)
   extends SparkListenerEvent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 29c5ff0..0b68b88 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -408,9 +408,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
             </td> +:
             getFormattedTimeQuantiles(gettingResultTimes)
 
-            val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
-              metrics.get.peakExecutionMemory.toDouble
-            }
+          val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
+            metrics.get.peakExecutionMemory.toDouble
+          }
           val peakExecutionMemoryQuantiles = {
             <td>
               <span data-toggle="tooltip"

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 38e6478..09d9553 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -236,7 +236,7 @@ private[spark] object JsonProtocol {
     val accumUpdates = metricsUpdate.accumUpdates
     ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
     ("Executor ID" -> execId) ~
-      ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
+    ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
       ("Task ID" -> taskId) ~
       ("Stage ID" -> stageId) ~
       ("Stage Attempt ID" -> stageAttemptId) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index e0fdd45..4d49fe5 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -268,7 +268,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
     val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
     val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
     val externalAccums = Seq(acc1, acc2)
-    val internalAccums = InternalAccumulator.create()
+    val internalAccums = InternalAccumulator.createAll()
     // Set some values; these should not be observed later on the "executors"
     acc1.setValue(10)
     acc2.setValue(20L)

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 44a16e2..c426bb7 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -87,7 +87,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
   }
 
   test("create") {
-    val accums = create()
+    val accums = createAll()
     val shuffleReadAccums = createShuffleReadAccums()
     val shuffleWriteAccums = createShuffleWriteAccums()
     val inputAccums = createInputAccums()
@@ -123,7 +123,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
   }
 
   test("naming") {
-    val accums = create()
+    val accums = createAll()
     val shuffleReadAccums = createShuffleReadAccums()
     val shuffleWriteAccums = createShuffleWriteAccums()
     val inputAccums = createInputAccums()
@@ -291,7 +291,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
     }
     assert(Accumulators.originals.isEmpty)
     sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
-    val internalAccums = InternalAccumulator.create()
+    val internalAccums = InternalAccumulator.createAll()
     // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
     assert(Accumulators.originals.size === internalAccums.size * 2)
     val accumsRegistered = sc.cleaner match {

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 67c4595..3a1a67c 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -31,7 +31,7 @@ class TaskMetricsSuite extends SparkFunSuite {
   import TaskMetricsSuite._
 
   test("create") {
-    val internalAccums = InternalAccumulator.create()
+    val internalAccums = InternalAccumulator.createAll()
     val tm1 = new TaskMetrics
     val tm2 = new TaskMetrics(internalAccums)
     assert(tm1.accumulatorUpdates().size === internalAccums.size)
@@ -51,7 +51,7 @@ class TaskMetricsSuite extends SparkFunSuite {
   test("create with unnamed accum") {
     intercept[IllegalArgumentException] {
       new TaskMetrics(
-        InternalAccumulator.create() ++ Seq(
+        InternalAccumulator.createAll() ++ Seq(
           new Accumulator(0, IntAccumulatorParam, None, internal = true)))
     }
   }
@@ -59,7 +59,7 @@ class TaskMetricsSuite extends SparkFunSuite {
   test("create with duplicate name accum") {
     intercept[IllegalArgumentException] {
       new TaskMetrics(
-        InternalAccumulator.create() ++ Seq(
+        InternalAccumulator.createAll() ++ Seq(
           new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true)))
     }
   }
@@ -67,7 +67,7 @@ class TaskMetricsSuite extends SparkFunSuite {
   test("create with external accum") {
     intercept[IllegalArgumentException] {
       new TaskMetrics(
-        InternalAccumulator.create() ++ Seq(
+        InternalAccumulator.createAll() ++ Seq(
           new Accumulator(0, IntAccumulatorParam, Some("x"))))
     }
   }
@@ -131,7 +131,7 @@ class TaskMetricsSuite extends SparkFunSuite {
   }
 
   test("mutating values") {
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val tm = new TaskMetrics(accums)
     // initial values
     assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L)
@@ -180,7 +180,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("mutating shuffle read metrics values") {
     import shuffleRead._
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val tm = new TaskMetrics(accums)
     def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
       assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
@@ -234,7 +234,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("mutating shuffle write metrics values") {
     import shuffleWrite._
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val tm = new TaskMetrics(accums)
     def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
       assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
@@ -267,7 +267,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("mutating input metrics values") {
     import input._
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val tm = new TaskMetrics(accums)
     def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
       assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
@@ -296,7 +296,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("mutating output metrics values") {
     import output._
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val tm = new TaskMetrics(accums)
     def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
       assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
@@ -381,7 +381,7 @@ class TaskMetricsSuite extends SparkFunSuite {
   }
 
   test("additional accumulables") {
-    val internalAccums = InternalAccumulator.create()
+    val internalAccums = InternalAccumulator.createAll()
     val tm = new TaskMetrics(internalAccums)
     assert(tm.accumulatorUpdates().size === internalAccums.size)
     val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
@@ -419,7 +419,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("existing values in shuffle read accums") {
     // set shuffle read accum before passing it into TaskMetrics
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME))
     assert(srAccum.isDefined)
     srAccum.get.asInstanceOf[Accumulator[Long]] += 10L
@@ -432,7 +432,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("existing values in shuffle write accums") {
     // set shuffle write accum before passing it into TaskMetrics
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN))
     assert(swAccum.isDefined)
     swAccum.get.asInstanceOf[Accumulator[Long]] += 10L
@@ -445,7 +445,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("existing values in input accums") {
     // set input accum before passing it into TaskMetrics
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val inAccum = accums.find(_.name === Some(input.RECORDS_READ))
     assert(inAccum.isDefined)
     inAccum.get.asInstanceOf[Accumulator[Long]] += 10L
@@ -458,7 +458,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("existing values in output accums") {
     // set output accum before passing it into TaskMetrics
-    val accums = InternalAccumulator.create()
+    val accums = InternalAccumulator.createAll()
     val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN))
     assert(outAccum.isDefined)
     outAccum.get.asInstanceOf[Accumulator[Long]] += 10L
@@ -470,7 +470,7 @@ class TaskMetricsSuite extends SparkFunSuite {
   }
 
   test("from accumulator updates") {
-    val accumUpdates1 = InternalAccumulator.create().map { a =>
+    val accumUpdates1 = InternalAccumulator.createAll().map { a =>
       AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
     }
     val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index b3bb86d..850e470 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -127,7 +127,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
     val param = AccumulatorParam.LongAccumulatorParam
     val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
     val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
-    val initialAccums = InternalAccumulator.create()
+    val initialAccums = InternalAccumulator.createAll()
     // Create a dummy task. We won't end up running this; we just want to collect
     // accumulator updates from it.
     val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 18a16a2..9876bde 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
     val execId = "exe-1"
 
     def makeTaskMetrics(base: Int): TaskMetrics = {
-      val accums = InternalAccumulator.create()
+      val accums = InternalAccumulator.createAll()
       accums.foreach(Accumulators.register)
       val taskMetrics = new TaskMetrics(accums)
       val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 48951c3..de6f408 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -508,7 +508,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
 
   /** -------------------------------- *
    | Util methods for comparing events |
-    * --------------------------------- */
+   * --------------------------------- */
 
   private[spark] def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
     (event1, event2) match {
@@ -773,7 +773,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
 
   /** ----------------------------------- *
    | Util methods for constructing events |
-    * ------------------------------------ */
+   * ------------------------------------ */
 
   private val properties = {
     val p = new Properties

http://git-wip-us.apache.org/repos/asf/spark/blob/eeaf45b9/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 8b1a730..9209094 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -187,7 +187,8 @@ object MimaExcludes {
       ) ++ Seq(
         // SPARK-12896 Send only accumulator updates to driver, not TaskMetrics
         ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulable.this"),
-        ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this")
+        ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this"),
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.Accumulator.initialValue")
       ) ++ Seq(
         // SPARK-12692 Scala style: Fix the style violation (Space before "," or ":")
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org