You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/02/20 11:21:44 UTC
spark git commit: SPARK-5744 [CORE] Take 2. RDD.isEmpty / take fails
for (empty) RDD of Nothing
Repository: spark
Updated Branches:
refs/heads/master 70bfb5c72 -> d3dfebebc
SPARK-5744 [CORE] Take 2. RDD.isEmpty / take fails for (empty) RDD of Nothing
Follow-on to https://github.com/apache/spark/pull/4591
Document isEmpty / take / parallelize and their interaction with (an empty) RDD[Nothing] and RDD[Null]. Also, fix a marginally related minor issue with histogram() and EmptyRDD.
CC rxin since you reviewed the last one although I imagine this is an uncontroversial resolution.
Author: Sean Owen <so...@cloudera.com>
Closes #4698 from srowen/SPARK-5744.2 and squashes the following commits:
9b2a811 [Sean Owen] 2 extra javadoc fixes
d1b9fba [Sean Owen] Document isEmpty / take / parallelize and their interaction with (an empty) RDD[Nothing] and RDD[Null]. Also, fix a marginally related minor issue with histogram() and EmptyRDD.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3dfebeb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3dfebeb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3dfebeb
Branch: refs/heads/master
Commit: d3dfebebce9f76e4433e16d4d6d29fb8fa4d4193
Parents: 70bfb5c
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Feb 20 10:21:39 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Feb 20 10:21:39 2015 +0000
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/SparkContext.scala | 2 ++
.../scala/org/apache/spark/rdd/DoubleRDDFunctions.scala | 9 ++++++++-
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 +++++++
core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++++
.../test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala | 3 +++
5 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d59b466..85ec5ea 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -548,6 +548,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
+ * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
+ * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
assertNotStopped()
http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index e66f83b..03afc28 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -213,7 +213,14 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
} else {
basicBucketFunction _
}
- self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
+ if (self.partitions.length == 0) {
+ new Array[Long](buckets.length - 1)
+ } else {
+ // reduce() requires a non-empty RDD. This works because the mapPartitions will make
+ // non-empty partitions out of empty ones. But it doesn't handle the no-partitions case,
+ // which is below
+ self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3ab9e54..cf04330 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1146,6 +1146,9 @@ abstract class RDD[T: ClassTag](
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
+ *
+ * @note due to complications in the internal implementation, this method will raise
+ * an exception if called on an RDD of `Nothing` or `Null`.
*/
def take(num: Int): Array[T] = {
if (num == 0) {
@@ -1258,6 +1261,10 @@ abstract class RDD[T: ClassTag](
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
/**
+ * @note due to complications in the internal implementation, this method will raise an
+ * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
+ * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
+ * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index b16a1e9..74e88c7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -708,6 +708,10 @@ public class JavaAPISuite implements Serializable {
// Test with provided buckets
long[] histogram = rdd.histogram(expected_buckets);
Assert.assertArrayEquals(expected_counts, histogram);
+ // SPARK-5744
+ Assert.assertArrayEquals(
+ new long[] {0},
+ sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0}));
}
@Test
http://git-wip-us.apache.org/repos/asf/spark/blob/d3dfebeb/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index de30653..4cd0f97 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -33,6 +33,9 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
val expectedHistogramResults = Array(0)
assert(histogramResults === expectedHistogramResults)
assert(histogramResults2 === expectedHistogramResults)
+ val emptyRDD: RDD[Double] = sc.emptyRDD
+ assert(emptyRDD.histogram(buckets) === expectedHistogramResults)
+ assert(emptyRDD.histogram(buckets, true) === expectedHistogramResults)
}
test("WorksWithOutOfRangeWithOneBucket") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org