You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/20 06:41:42 UTC
spark git commit: [SPARK-18003][SPARK CORE] Fix bug of RDD
zipWithIndex & zipWithUniqueId index value overflowing
Repository: spark
Updated Branches:
refs/heads/master f313117bc -> 39755169f
[SPARK-18003][SPARK CORE] Fix bug of RDD zipWithIndex & zipWithUniqueId index value overflowing
## What changes were proposed in this pull request?
- Fix bug of RDD `zipWithIndex` generating wrong result when one partition contains more than 2147483647 records.
- Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition contains more than 2147483647 records.
## How was this patch tested?
test added.
Author: WeichenXu <We...@outlook.com>
Closes #15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39755169
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39755169
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39755169
Branch: refs/heads/master
Commit: 39755169fb5bb07332eef263b4c18ede1528812d
Parents: f313117
Author: WeichenXu <We...@outlook.com>
Authored: Wed Oct 19 23:41:38 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Oct 19 23:41:38 2016 -0700
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +-
.../org/apache/spark/rdd/ZippedWithIndexRDD.scala | 5 ++---
.../src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++
.../scala/org/apache/spark/util/UtilsSuite.scala | 7 +++++++
4 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/39755169/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6dc334c..be11957 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag](
def zipWithUniqueId(): RDD[(T, Long)] = withScope {
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
- iter.zipWithIndex.map { case (item, i) =>
+ Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
(item, i * n + k)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/39755169/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index b5738b9..b0e5ba0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -64,8 +64,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
- firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
- (x._1, split.startIndex + x._2)
- }
+ val parentIter = firstParent[T].iterator(split.prev, context)
+ Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/39755169/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7fba901..bfc6094 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1760,6 +1760,21 @@ private[spark] object Utils extends Logging {
}
/**
+ * Generate a zipWithIndex iterator, avoid index value overflowing problem
+ * in scala's zipWithIndex
+ */
+ def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
+ new Iterator[(T, Long)] {
+ var index: Long = startIndex - 1L
+ def hasNext: Boolean = iterator.hasNext
+ def next(): (T, Long) = {
+ index += 1L
+ (iterator.next(), index)
+ }
+ }
+ }
+
+ /**
* Creates a symlink.
*
* @param src absolute path to the source
http://git-wip-us.apache.org/repos/asf/spark/blob/39755169/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index b427f7f..4dda80f 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -396,6 +396,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.getIteratorSize(iterator) === 5L)
}
+ test("getIteratorZipWithIndex") {
+ val iterator = Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L + Int.MaxValue)
+ assert(iterator.toArray === Array(
+ (0, -1L + Int.MaxValue), (1, 0L + Int.MaxValue), (2, 1L + Int.MaxValue)
+ ))
+ }
+
test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org