You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/11/19 01:25:51 UTC

spark git commit: [SPARK-4433] fix a racing condition in zipWithIndex

Repository: spark
Updated Branches:
  refs/heads/master 4a377aff2 -> bb4604615


[SPARK-4433] fix a racing condition in zipWithIndex

Spark hangs with the following code:

~~~
sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()
~~~

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

pwendell

Author: Xiangrui Meng <me...@databricks.com>

Closes #3291 from mengxr/SPARK-4433 and squashes the following commits:

c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb460461
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb460461
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb460461

Branch: refs/heads/master
Commit: bb46046154a438df4db30a0e1fd557bd3399ee7b
Parents: 4a377af
Author: Xiangrui Meng <me...@databricks.com>
Authored: Tue Nov 18 16:25:44 2014 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Tue Nov 18 16:25:44 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/rdd/ZippedWithIndexRDD.scala   | 31 +++++++++++---------
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |  5 ++++
 2 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb460461/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index e2c3016..8c43a55 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -39,21 +39,24 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
 private[spark]
 class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
 
-  override def getPartitions: Array[Partition] = {
+  /** The start index of each partition. */
+  @transient private val startIndices: Array[Long] = {
     val n = prev.partitions.size
-    val startIndices: Array[Long] =
-      if (n == 0) {
-        Array[Long]()
-      } else if (n == 1) {
-        Array(0L)
-      } else {
-        prev.context.runJob(
-          prev,
-          Utils.getIteratorSize _,
-          0 until n - 1, // do not need to count the last partition
-          false
-        ).scanLeft(0L)(_ + _)
-      }
+    if (n == 0) {
+      Array[Long]()
+    } else if (n == 1) {
+      Array(0L)
+    } else {
+      prev.context.runJob(
+        prev,
+        Utils.getIteratorSize _,
+        0 until n - 1, // do not need to count the last partition
+        allowLocal = false
+      ).scanLeft(0L)(_ + _)
+    }
+  }
+
+  override def getPartitions: Array[Partition] = {
     firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bb460461/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6d2e696..e079ca3 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -739,6 +739,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  test("zipWithIndex chained with other RDDs (SPARK-4433)") {
+    val count = sc.parallelize(0 until 10, 2).zipWithIndex().repartition(4).count()
+    assert(count === 10)
+  }
+
   test("zipWithUniqueId") {
     val n = 10
     val data = sc.parallelize(0 until n, 3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org