You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/11 19:51:09 UTC

spark git commit: [SPARK-17816][CORE][BRANCH-2.0] Fix ConcurrentModificationException issue in BlockStatusesAccumulator

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a6b5e1dcc -> 5ec3e6680


[SPARK-17816][CORE][BRANCH-2.0] Fix ConcurrentModificationException issue in BlockStatusesAccumulator

## What changes were proposed in this pull request?
Replaced `BlockStatusesAccumulator` with `CollectionAccumulator` which is thread safe and few more cleanups.

## How was this patch tested?
Tested in master branch and cherry-picked.

Author: Ergin Seyfe <es...@fb.com>

Closes #15425 from seyfe/race_cond_jsonprotocal_branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ec3e668
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ec3e668
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ec3e668

Branch: refs/heads/branch-2.0
Commit: 5ec3e6680a091883369c002ae599d6b03f38c863
Parents: a6b5e1d
Author: Ergin Seyfe <es...@fb.com>
Authored: Tue Oct 11 12:51:08 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Oct 11 12:51:08 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala | 42 +-------------------
 .../org/apache/spark/util/AccumulatorV2.scala   |  4 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 3 files changed, 6 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ec3e668/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 52a3499..47aec44 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.executor
 
-import java.util.{ArrayList, Collections}
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
 
@@ -27,7 +25,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.storage.{BlockId, BlockStatus}
-import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator}
+import org.apache.spark.util._
 
 
 /**
@@ -54,7 +52,7 @@ class TaskMetrics private[spark] () extends Serializable {
   private val _memoryBytesSpilled = new LongAccumulator
   private val _diskBytesSpilled = new LongAccumulator
   private val _peakExecutionMemory = new LongAccumulator
-  private val _updatedBlockStatuses = new BlockStatusesAccumulator
+  private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]
 
   /**
    * Time taken on the executor to deserialize this task.
@@ -305,39 +303,3 @@ private[spark] object TaskMetrics extends Logging {
     tm
   }
 }
-
-
-private[spark] class BlockStatusesAccumulator
-  extends AccumulatorV2[(BlockId, BlockStatus), java.util.List[(BlockId, BlockStatus)]] {
-  private val _seq = Collections.synchronizedList(new ArrayList[(BlockId, BlockStatus)]())
-
-  override def isZero(): Boolean = _seq.isEmpty
-
-  override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator
-
-  override def copy(): BlockStatusesAccumulator = {
-    val newAcc = new BlockStatusesAccumulator
-    newAcc._seq.addAll(_seq)
-    newAcc
-  }
-
-  override def reset(): Unit = _seq.clear()
-
-  override def add(v: (BlockId, BlockStatus)): Unit = _seq.add(v)
-
-  override def merge(
-    other: AccumulatorV2[(BlockId, BlockStatus), java.util.List[(BlockId, BlockStatus)]]): Unit = {
-    other match {
-      case o: BlockStatusesAccumulator => _seq.addAll(o.value)
-      case _ => throw new UnsupportedOperationException(
-        s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
-    }
-  }
-
-  override def value: java.util.List[(BlockId, BlockStatus)] = _seq
-
-  def setValue(newValue: java.util.List[(BlockId, BlockStatus)]): Unit = {
-    _seq.clear()
-    _seq.addAll(newValue)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5ec3e668/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 470d912..d3ddd39 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -444,7 +444,9 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
 
   override def copy(): CollectionAccumulator[T] = {
     val newAcc = new CollectionAccumulator[T]
-    newAcc._list.addAll(_list)
+    _list.synchronized {
+      newAcc._list.addAll(_list)
+    }
     newAcc
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ec3e668/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 148635f..8861696 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -280,7 +280,7 @@ private[spark] object JsonProtocol {
     ("Getting Result Time" -> taskInfo.gettingResultTime) ~
     ("Finish Time" -> taskInfo.finishTime) ~
     ("Failed" -> taskInfo.failed) ~
-    ("Accumulables" -> JArray(taskInfo.accumulables.map(accumulableInfoToJson).toList))
+    ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
   }
 
   def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org