You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/09/26 03:24:08 UTC

git commit: SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap

Repository: spark
Updated Branches:
  refs/heads/master 0dc868e78 -> 86bce7649


SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap

MapOutputTrackerWorker.mapStatuses is used concurrently, it should be thread-safe. This bug has already been fixed in #1328. Nevertheless, considering #1328 won't be merged soon, I send this trivial fix and hope this issue can be solved soon.

Author: zsxwing <zs...@gmail.com>

Closes #1541 from zsxwing/SPARK-2634 and squashes the following commits:

d450053 [zsxwing] SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86bce764
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86bce764
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86bce764

Branch: refs/heads/master
Commit: 86bce764983f2b14e1bd87fc3f4f938f7a217e1b
Parents: 0dc868e
Author: zsxwing <zs...@gmail.com>
Authored: Thu Sep 25 18:24:01 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Thu Sep 25 18:24:01 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/86bce764/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 51705c8..f92189b 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,10 +18,12 @@
 package org.apache.spark
 
 import java.io._
+import java.util.concurrent.ConcurrentHashMap
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
 import scala.collection.mutable.{HashSet, HashMap, Map}
 import scala.concurrent.Await
+import scala.collection.JavaConversions._
 
 import akka.actor._
 import akka.pattern.ask
@@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    * On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
    * On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
    * master's corresponding HashMap.
+   *
+   * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
+   * thread-safe map.
    */
   protected val mapStatuses: Map[Int, Array[MapStatus]]
 
@@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
  * MapOutputTrackerMaster.
  */
 private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
-  protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
+  protected val mapStatuses: Map[Int, Array[MapStatus]] =
+    new ConcurrentHashMap[Int, Array[MapStatus]]
 }
 
 private[spark] object MapOutputTracker {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org