You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/02/20 11:21:44 UTC

spark git commit: SPARK-5744 [CORE] Take 2. RDD.isEmpty / take fails for (empty) RDD of Nothing

Repository: spark
Updated Branches:
  refs/heads/master 70bfb5c72 -> d3dfebebc


SPARK-5744 [CORE] Take 2. RDD.isEmpty / take fails for (empty) RDD of Nothing

Follow-on to https://github.com/apache/spark/pull/4591

Document isEmpty / take / parallelize and their interaction with (an empty) RDD[Nothing] and RDD[Null]. Also, fix a marginally related minor issue with histogram() and EmptyRDD.

CC rxin since you reviewed the last one although I imagine this is an uncontroversial resolution.

Author: Sean Owen <so...@cloudera.com>

Closes #4698 from srowen/SPARK-5744.2 and squashes the following commits:

9b2a811 [Sean Owen] 2 extra javadoc fixes
d1b9fba [Sean Owen] Document isEmpty / take / parallelize and their interaction with (an empty) RDD[Nothing] and RDD[Null]. Also, fix a marginally related minor issue with histogram() and EmptyRDD.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3dfebeb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3dfebeb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3dfebeb

Branch: refs/heads/master
Commit: d3dfebebce9f76e4433e16d4d6d29fb8fa4d4193
Parents: 70bfb5c
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Feb 20 10:21:39 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Feb 20 10:21:39 2015 +0000

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkContext.scala     | 2 ++
 .../scala/org/apache/spark/rdd/DoubleRDDFunctions.scala     | 9 ++++++++-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala          | 7 +++++++
 core/src/test/java/org/apache/spark/JavaAPISuite.java       | 4 ++++
 .../test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala    | 3 +++
 5 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d59b466..85ec5ea 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -548,6 +548,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
    * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
    * modified collection. Pass a copy of the argument to avoid this.
+   * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
+   * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
    */
   def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
     assertNotStopped()

http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index e66f83b..03afc28 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -213,7 +213,14 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
     } else {
       basicBucketFunction _
     }
-    self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
+    if (self.partitions.length == 0) {
+      new Array[Long](buckets.length - 1)
+    } else {
+      // reduce() requires a non-empty RDD. This works because the mapPartitions will make
+      // non-empty partitions out of empty ones. But it doesn't handle the no-partitions case,
+      // which is below
+      self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3ab9e54..cf04330 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1146,6 +1146,9 @@ abstract class RDD[T: ClassTag](
    * Take the first num elements of the RDD. It works by first scanning one partition, and use the
    * results from that partition to estimate the number of additional partitions needed to satisfy
    * the limit.
+   *
+   * @note due to complications in the internal implementation, this method will raise
+   * an exception if called on an RDD of `Nothing` or `Null`.
    */
   def take(num: Int): Array[T] = {
     if (num == 0) {
@@ -1258,6 +1261,10 @@ abstract class RDD[T: ClassTag](
   def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
 
   /**
+   * @note due to complications in the internal implementation, this method will raise an
+   * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
+   * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
+   * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
    * @return true if and only if the RDD contains no elements at all. Note that an RDD
    *         may be empty even when it has at least 1 partition.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index b16a1e9..74e88c7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -708,6 +708,10 @@ public class JavaAPISuite implements Serializable {
     // Test with provided buckets
     long[] histogram = rdd.histogram(expected_buckets);
     Assert.assertArrayEquals(expected_counts, histogram);
+    // SPARK-5744
+    Assert.assertArrayEquals(
+        new long[] {0},
+        sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0}));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index de30653..4cd0f97 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -33,6 +33,9 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     val expectedHistogramResults = Array(0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
+    val emptyRDD: RDD[Double] = sc.emptyRDD
+    assert(emptyRDD.histogram(buckets) === expectedHistogramResults)
+    assert(emptyRDD.histogram(buckets, true) === expectedHistogramResults)
   }
 
   test("WorksWithOutOfRangeWithOneBucket") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org