You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/11 19:51:09 UTC
spark git commit: [SPARK-17816][CORE][BRANCH-2.0] Fix
ConcurrentModificationException issue in BlockStatusesAccumulator
Repository: spark
Updated Branches:
refs/heads/branch-2.0 a6b5e1dcc -> 5ec3e6680
[SPARK-17816][CORE][BRANCH-2.0] Fix ConcurrentModificationException issue in BlockStatusesAccumulator
## What changes were proposed in this pull request?
Replaced `BlockStatusesAccumulator` with `CollectionAccumulator` which is thread safe and few more cleanups.
## How was this patch tested?
Tested in master branch and cherry-picked.
Author: Ergin Seyfe <es...@fb.com>
Closes #15425 from seyfe/race_cond_jsonprotocal_branch-2.0.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ec3e668
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ec3e668
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ec3e668
Branch: refs/heads/branch-2.0
Commit: 5ec3e6680a091883369c002ae599d6b03f38c863
Parents: a6b5e1d
Author: Ergin Seyfe <es...@fb.com>
Authored: Tue Oct 11 12:51:08 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Oct 11 12:51:08 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/executor/TaskMetrics.scala | 42 +-------------------
.../org/apache/spark/util/AccumulatorV2.scala | 4 +-
.../org/apache/spark/util/JsonProtocol.scala | 2 +-
3 files changed, 6 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5ec3e668/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 52a3499..47aec44 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,8 +17,6 @@
package org.apache.spark.executor
-import java.util.{ArrayList, Collections}
-
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
@@ -27,7 +25,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
-import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator}
+import org.apache.spark.util._
/**
@@ -54,7 +52,7 @@ class TaskMetrics private[spark] () extends Serializable {
private val _memoryBytesSpilled = new LongAccumulator
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
- private val _updatedBlockStatuses = new BlockStatusesAccumulator
+ private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]
/**
* Time taken on the executor to deserialize this task.
@@ -305,39 +303,3 @@ private[spark] object TaskMetrics extends Logging {
tm
}
}
-
-
-private[spark] class BlockStatusesAccumulator
- extends AccumulatorV2[(BlockId, BlockStatus), java.util.List[(BlockId, BlockStatus)]] {
- private val _seq = Collections.synchronizedList(new ArrayList[(BlockId, BlockStatus)]())
-
- override def isZero(): Boolean = _seq.isEmpty
-
- override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator
-
- override def copy(): BlockStatusesAccumulator = {
- val newAcc = new BlockStatusesAccumulator
- newAcc._seq.addAll(_seq)
- newAcc
- }
-
- override def reset(): Unit = _seq.clear()
-
- override def add(v: (BlockId, BlockStatus)): Unit = _seq.add(v)
-
- override def merge(
- other: AccumulatorV2[(BlockId, BlockStatus), java.util.List[(BlockId, BlockStatus)]]): Unit = {
- other match {
- case o: BlockStatusesAccumulator => _seq.addAll(o.value)
- case _ => throw new UnsupportedOperationException(
- s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
- }
- }
-
- override def value: java.util.List[(BlockId, BlockStatus)] = _seq
-
- def setValue(newValue: java.util.List[(BlockId, BlockStatus)]): Unit = {
- _seq.clear()
- _seq.addAll(newValue)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/5ec3e668/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 470d912..d3ddd39 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -444,7 +444,9 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
override def copy(): CollectionAccumulator[T] = {
val newAcc = new CollectionAccumulator[T]
- newAcc._list.addAll(_list)
+ _list.synchronized {
+ newAcc._list.addAll(_list)
+ }
newAcc
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5ec3e668/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 148635f..8861696 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -280,7 +280,7 @@ private[spark] object JsonProtocol {
("Getting Result Time" -> taskInfo.gettingResultTime) ~
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
- ("Accumulables" -> JArray(taskInfo.accumulables.map(accumulableInfoToJson).toList))
+ ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
}
def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org