You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andy Sloane (JIRA)" <ji...@apache.org> on 2016/03/03 09:55:18 UTC

[jira] [Comment Edited] (SPARK-13631) getPreferredLocations race condition in spark 1.6.0?

    [ https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177502#comment-15177502 ] 

Andy Sloane edited comment on SPARK-13631 at 3/3/16 8:55 AM:
-------------------------------------------------------------

Further bisecting with the reduceLocality flag forced to true confirmed; SPARK-2774 / commit 96a7c888d806adfdb2c722025a1079ed7eaa2052 introduced the function that's failing here with a note that it's known not to be thread safe. But it seems we're calling it in a multithreaded context and when the exception occurs, mapStatuses contains an entry for the shuffle, but it's an array full of nulls (I am guessing it's because {{registerShuffle}} was called but none of the map outputs have been registered yet).

So, the following sort of lame patch seems to fix it, but I'm not totally sure about its correctness, or whether we're doing something as Spark clients that's causing us to get into a bad state (we have several threads running jobs concurrently):

{code}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 72355cd..c0f1a36 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -394,28 +394,32 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       fractionThreshold: Double)
     : Option[Array[BlockManagerId]] = {
 
-    if (mapStatuses.contains(shuffleId)) {
-      val statuses = mapStatuses(shuffleId)
-      if (statuses.nonEmpty) {
-        // HashMap to add up sizes of all blocks at the same location
-        val locs = new HashMap[BlockManagerId, Long]
-        var totalOutputSize = 0L
-        var mapIdx = 0
-        while (mapIdx < statuses.length) {
-          val status = statuses(mapIdx)
-          val blockSize = status.getSizeForBlock(reducerId)
-          if (blockSize > 0) {
-            locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
-            totalOutputSize += blockSize
+    val statuses = mapStatuses.get(shuffleId).orNull
+    if (statuses != null) {
+      statuses.synchronized {
+        if (statuses.nonEmpty) {
+          // HashMap to add up sizes of all blocks at the same location
+          val locs = new HashMap[BlockManagerId, Long]
+          var totalOutputSize = 0L
+          var mapIdx = 0
+          while (mapIdx < statuses.length) {
+            val status = statuses(mapIdx)
+            if (status != null) {
+              val blockSize = status.getSizeForBlock(reducerId)
+              if (blockSize > 0) {
+                locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
+                totalOutputSize += blockSize
+              }
+            }
+            mapIdx = mapIdx + 1
+          }
+          val topLocs = locs.filter { case (loc, size) =>
+            size.toDouble / totalOutputSize >= fractionThreshold
+          }
+          // Return if we have any locations which satisfy the required threshold
+          if (topLocs.nonEmpty) {
+            return Some(topLocs.map(_._1).toArray)
           }
-          mapIdx = mapIdx + 1
-        }
-        val topLocs = locs.filter { case (loc, size) =>
-          size.toDouble / totalOutputSize >= fractionThreshold
-        }
-        // Return if we have any locations which satisfy the required threshold
-        if (topLocs.nonEmpty) {
-          return Some(topLocs.map(_._1).toArray)
         }
       }
     }
{code}



was (Author: asloane):
Further bisecting with the reduceLocality flag forced to true confirmed; SPARK-2774 / commit 96a7c888d806adfdb2c722025a1079ed7eaa2052 introduced the function that's failing here with a note that it's known not to be thread safe. But it seems we're calling it in a multithreaded context and when the exception occurs, mapStatuses contains an entry for the shuffle, but it's an array full of nulls (I am guessing it's because {{registerShuffle}} was called but none of the map outputs have been registered yet).

So, the following sort of lame patch seems to fix it, but I'm not totally sure about its correctness, or whether we're doing something as Spark clients that's causing us to get into a bad state (we have several threads running scheduling jobs concurrently):

{code}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 72355cd..c0f1a36 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -394,28 +394,32 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       fractionThreshold: Double)
     : Option[Array[BlockManagerId]] = {
 
-    if (mapStatuses.contains(shuffleId)) {
-      val statuses = mapStatuses(shuffleId)
-      if (statuses.nonEmpty) {
-        // HashMap to add up sizes of all blocks at the same location
-        val locs = new HashMap[BlockManagerId, Long]
-        var totalOutputSize = 0L
-        var mapIdx = 0
-        while (mapIdx < statuses.length) {
-          val status = statuses(mapIdx)
-          val blockSize = status.getSizeForBlock(reducerId)
-          if (blockSize > 0) {
-            locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
-            totalOutputSize += blockSize
+    val statuses = mapStatuses.get(shuffleId).orNull
+    if (statuses != null) {
+      statuses.synchronized {
+        if (statuses.nonEmpty) {
+          // HashMap to add up sizes of all blocks at the same location
+          val locs = new HashMap[BlockManagerId, Long]
+          var totalOutputSize = 0L
+          var mapIdx = 0
+          while (mapIdx < statuses.length) {
+            val status = statuses(mapIdx)
+            if (status != null) {
+              val blockSize = status.getSizeForBlock(reducerId)
+              if (blockSize > 0) {
+                locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
+                totalOutputSize += blockSize
+              }
+            }
+            mapIdx = mapIdx + 1
+          }
+          val topLocs = locs.filter { case (loc, size) =>
+            size.toDouble / totalOutputSize >= fractionThreshold
+          }
+          // Return if we have any locations which satisfy the required threshold
+          if (topLocs.nonEmpty) {
+            return Some(topLocs.map(_._1).toArray)
           }
-          mapIdx = mapIdx + 1
-        }
-        val topLocs = locs.filter { case (loc, size) =>
-          size.toDouble / totalOutputSize >= fractionThreshold
-        }
-        // Return if we have any locations which satisfy the required threshold
-        if (topLocs.nonEmpty) {
-          return Some(topLocs.map(_._1).toArray)
         }
       }
     }
{code}


> getPreferredLocations race condition in spark 1.6.0?
> ----------------------------------------------------
>
>                 Key: SPARK-13631
>                 URL: https://issues.apache.org/jira/browse/SPARK-13631
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.6.0
>            Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. When we run jobs with multiple threads, we have a crash somewhere inside getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
> 	at org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
> 	at org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
> 	at org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
> 	at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
> 	at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
> 	at scala.Option.getOrElse(Option.scala:120)
> 	at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
> The full trace is available here:
> https://gist.github.com/andy256/97611f19924bbf65cf49



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org