You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/20 06:41:42 UTC

spark git commit: [SPARK-18003][SPARK CORE] Fix bug of RDD zipWithIndex & zipWithUniqueId index value overflowing

Repository: spark
Updated Branches:
  refs/heads/master f313117bc -> 39755169f


[SPARK-18003][SPARK CORE] Fix bug of RDD zipWithIndex & zipWithUniqueId index value overflowing

## What changes were proposed in this pull request?

- Fix bug of RDD `zipWithIndex` generating wrong result when one partition contains more than 2147483647 records.

- Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition contains more than 2147483647 records.

## How was this patch tested?

test added.

Author: WeichenXu <We...@outlook.com>

Closes #15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39755169
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39755169
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39755169

Branch: refs/heads/master
Commit: 39755169fb5bb07332eef263b4c18ede1528812d
Parents: f313117
Author: WeichenXu <We...@outlook.com>
Authored: Wed Oct 19 23:41:38 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Oct 19 23:41:38 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala   |  2 +-
 .../org/apache/spark/rdd/ZippedWithIndexRDD.scala    |  5 ++---
 .../src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++
 .../scala/org/apache/spark/util/UtilsSuite.scala     |  7 +++++++
 4 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39755169/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6dc334c..be11957 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag](
   def zipWithUniqueId(): RDD[(T, Long)] = withScope {
     val n = this.partitions.length.toLong
     this.mapPartitionsWithIndex { case (k, iter) =>
-      iter.zipWithIndex.map { case (item, i) =>
+      Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
         (item, i * n + k)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/39755169/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index b5738b9..b0e5ba0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -64,8 +64,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
 
   override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
     val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
-    firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
-      (x._1, split.startIndex + x._2)
-    }
+    val parentIter = firstParent[T].iterator(split.prev, context)
+    Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/39755169/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7fba901..bfc6094 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1760,6 +1760,21 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Generate a zipWithIndex iterator, avoid index value overflowing problem
+   * in scala's zipWithIndex
+   */
+  def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
+    new Iterator[(T, Long)] {
+      var index: Long = startIndex - 1L
+      def hasNext: Boolean = iterator.hasNext
+      def next(): (T, Long) = {
+        index += 1L
+        (iterator.next(), index)
+      }
+    }
+  }
+
+  /**
    * Creates a symlink.
    *
    * @param src absolute path to the source

http://git-wip-us.apache.org/repos/asf/spark/blob/39755169/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index b427f7f..4dda80f 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -396,6 +396,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     assert(Utils.getIteratorSize(iterator) === 5L)
   }
 
+  test("getIteratorZipWithIndex") {
+    val iterator = Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L + Int.MaxValue)
+    assert(iterator.toArray === Array(
+      (0, -1L + Int.MaxValue), (1, 0L + Int.MaxValue), (2, 1L + Int.MaxValue)
+    ))
+  }
+
   test("doesDirectoryContainFilesNewerThan") {
     // create some temporary directories and files
     val parent: File = Utils.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org