You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/16 18:24:50 UTC

[2/2] spark git commit: [SPARK-5201][CORE] deal with int overflow in the ParallelCollectionRDD.slice method

[SPARK-5201][CORE] deal with int overflow in the ParallelCollectionRDD.slice method

There is an int overflow in the ParallelCollectionRDD.slice method. That's originally reported by SaintBacchus.
```
sc.makeRDD(1 to (Int.MaxValue)).count       // result = 0
sc.makeRDD(1 to (Int.MaxValue - 1)).count   // result = 2147483646 = Int.MaxValue - 1
sc.makeRDD(1 until (Int.MaxValue)).count    // result = 2147483646 = Int.MaxValue - 1
```
see https://github.com/apache/spark/pull/2874 for more details.
This pr try to fix the overflow. However, There's another issue I don't address.
```
val largeRange = Int.MinValue to Int.MaxValue
largeRange.length // throws java.lang.IllegalArgumentException: -2147483648 to 2147483647 by 1: seqs cannot contain more than Int.MaxValue elements.
```

So, the range we feed to sc.makeRDD cannot contain more than Int.MaxValue elements. This is the limitation of Scala. However I think  we may want to support that kind of range. But the fix is beyond this pr.

srowen andrewor14 would you mind take a look at this pr?

Author: Ye Xianjin <ad...@gmail.com>

Closes #4002 from advancedxy/SPARk-5201 and squashes the following commits:

96265a1 [Ye Xianjin] Update slice method comment and some responding docs.
e143d7a [Ye Xianjin] Update inclusive range check for splitting inclusive range.
b3f5577 [Ye Xianjin] We can include the last element in the last slice in general for inclusive range, hence eliminate the need to check Int.MaxValue or Int.MinValue.
7d39b9e [Ye Xianjin] Convert the two cases pattern matching to one case.
651c959 [Ye Xianjin] rename sign to needsInclusiveRange. add some comments
196f8a8 [Ye Xianjin] Add test cases for ranges end with Int.MaxValue or Int.MinValue
e66e60a [Ye Xianjin] Deal with inclusive and exclusive ranges in one case. If the range is inclusive and the end of the range is (Int.MaxValue or Int.MinValue), we should use inclusive range instead of exclusive


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e38cb296
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e38cb296
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e38cb296

Branch: refs/heads/branch-1.2
Commit: e38cb29694d96621940c29016e5c30c1776b8fb0
Parents: 89a0990
Author: Ye Xianjin <ad...@gmail.com>
Authored: Fri Jan 16 09:20:53 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Jan 16 09:24:44 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  7 +++---
 .../spark/rdd/ParallelCollectionRDD.scala       | 21 +++++++---------
 .../rdd/ParallelCollectionSplitSuite.scala      | 25 ++++++++++++++++++++
 3 files changed, 37 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e38cb296/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1f85319..c2eff5c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -514,10 +514,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   /** Distribute a local Scala collection to form an RDD.
    *
-   * @note Parallelize acts lazily. If `seq` is a mutable collection and is
-   * altered after the call to parallelize and before the first action on the
-   * RDD, the resultant RDD will reflect the modified collection. Pass a copy of
-   * the argument to avoid this.
+   * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
+   * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
+   * modified collection. Pass a copy of the argument to avoid this.
    */
   def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
     new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())

http://git-wip-us.apache.org/repos/asf/spark/blob/e38cb296/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 87b22de..f12d0cf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -111,7 +111,8 @@ private object ParallelCollectionRDD {
   /**
    * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
    * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
-   * it efficient to run Spark over RDDs representing large sets of numbers.
+   * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
+   * is an inclusive Range, we use inclusive range for the last slice.
    */
   def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
     if (numSlices < 1) {
@@ -127,19 +128,15 @@ private object ParallelCollectionRDD {
       })
     }
     seq match {
-      case r: Range.Inclusive => {
-        val sign = if (r.step < 0) {
-          -1
-        } else {
-          1
-        }
-        slice(new Range(
-          r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
-      }
       case r: Range => {
-        positions(r.length, numSlices).map({
-          case (start, end) =>
+        positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
+          // If the range is inclusive, use inclusive range for the last slice
+          if (r.isInclusive && index == numSlices - 1) {
+            new Range.Inclusive(r.start + start * r.step, r.end, r.step)
+          }
+          else {
             new Range(r.start + start * r.step, r.start + end * r.step, r.step)
+          }
         }).toSeq.asInstanceOf[Seq[Seq[T]]]
       }
       case nr: NumericRange[_] => {

http://git-wip-us.apache.org/repos/asf/spark/blob/e38cb296/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index 1b112f1..cd193ae 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -76,6 +76,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     assert(slices(0).mkString(",") === (0 to 32).mkString(","))
     assert(slices(1).mkString(",") === (33 to 66).mkString(","))
     assert(slices(2).mkString(",") === (67 to 100).mkString(","))
+    assert(slices(2).isInstanceOf[Range.Inclusive])
   }
 
   test("empty data") {
@@ -227,4 +228,28 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     assert(slices.map(_.size).reduceLeft(_+_) === 100)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
   }
+
+  test("inclusive ranges with Int.MaxValue and Int.MinValue") {
+    val data1 = 1 to Int.MaxValue
+    val slices1 = ParallelCollectionRDD.slice(data1, 3)
+    assert(slices1.size === 3)
+    assert(slices1.map(_.size).sum === Int.MaxValue)
+    assert(slices1(2).isInstanceOf[Range.Inclusive])
+    val data2 = -2 to Int.MinValue by -1
+    val slices2 = ParallelCollectionRDD.slice(data2, 3)
+    assert(slices2.size == 3)
+    assert(slices2.map(_.size).sum === Int.MaxValue)
+    assert(slices2(2).isInstanceOf[Range.Inclusive])
+  }
+
+  test("empty ranges with Int.MaxValue and Int.MinValue") {
+    val data1 = Int.MaxValue until Int.MaxValue
+    val slices1 = ParallelCollectionRDD.slice(data1, 5)
+    assert(slices1.size === 5)
+    for (i <- 0 until 5) assert(slices1(i).size === 0)
+    val data2 = Int.MaxValue until Int.MaxValue
+    val slices2 = ParallelCollectionRDD.slice(data2, 5)
+    assert(slices2.size === 5)
+    for (i <- 0 until 5) assert(slices2(i).size === 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org