You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/09/26 03:24:08 UTC
git commit: SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to
ConcurrentHashMap
Repository: spark
Updated Branches:
refs/heads/master 0dc868e78 -> 86bce7649
SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap
MapOutputTrackerWorker.mapStatuses is used concurrently, it should be thread-safe. This bug has already been fixed in #1328. Nevertheless, considering #1328 won't be merged soon, I send this trivial fix and hope this issue can be solved soon.
Author: zsxwing <zs...@gmail.com>
Closes #1541 from zsxwing/SPARK-2634 and squashes the following commits:
d450053 [zsxwing] SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86bce764
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86bce764
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86bce764
Branch: refs/heads/master
Commit: 86bce764983f2b14e1bd87fc3f4f938f7a217e1b
Parents: 0dc868e
Author: zsxwing <zs...@gmail.com>
Authored: Thu Sep 25 18:24:01 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Thu Sep 25 18:24:01 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/86bce764/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 51705c8..f92189b 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,10 +18,12 @@
package org.apache.spark
import java.io._
+import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.concurrent.Await
+import scala.collection.JavaConversions._
import akka.actor._
import akka.pattern.ask
@@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
+ *
+ * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
+ * thread-safe map.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]
@@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
* MapOutputTrackerMaster.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
- protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
+ protected val mapStatuses: Map[Int, Array[MapStatus]] =
+ new ConcurrentHashMap[Int, Array[MapStatus]]
}
private[spark] object MapOutputTracker {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org