You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/01/20 07:51:00 UTC
spark git commit: SPARK-5270 [CORE] Provide isEmpty() function in RDD
API
Repository: spark
Updated Branches:
refs/heads/master e69fb8c75 -> 306ff187a
SPARK-5270 [CORE] Provide isEmpty() function in RDD API
Pretty minor, but submitted for consideration -- this would at least help people make this check in the most efficient way I know.
Author: Sean Owen <so...@cloudera.com>
Closes #4074 from srowen/SPARK-5270 and squashes the following commits:
66885b8 [Sean Owen] Add note that JavaRDDLike should not be implemented by user code
2e9b490 [Sean Owen] More tests, and Mima-exclude the new isEmpty method in JavaRDDLike
28395ff [Sean Owen] Add isEmpty to Java, Python
7dd04b7 [Sean Owen] Add efficient RDD.isEmpty()
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/306ff187
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/306ff187
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/306ff187
Branch: refs/heads/master
Commit: 306ff187af0c49f61f4bc1850021561397b4f8f1
Parents: e69fb8c
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Jan 19 22:50:44 2015 -0800
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Mon Jan 19 22:50:45 2015 -0800
----------------------------------------------------------------------
.../org/apache/spark/api/java/JavaRDDLike.scala | 10 ++++++++++
.../main/scala/org/apache/spark/rdd/RDD.scala | 6 ++++++
.../java/org/apache/spark/JavaAPISuite.java | 21 ++++++++++++++++++++
.../scala/org/apache/spark/rdd/RDDSuite.scala | 9 +++++++++
project/MimaExcludes.scala | 4 ++++
python/pyspark/rdd.py | 12 +++++++++++
6 files changed, 62 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index bd45163..62bf18d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -38,6 +38,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
+/**
+ * Defines operations common to several Java RDD implementations.
+ * Note that this trait is not intended to be implemented by user code.
+ */
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
@@ -436,6 +440,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def first(): T = rdd.first()
/**
+ * @return true if and only if the RDD contains no elements at all. Note that an RDD
+ * may be empty even when it has at least 1 partition.
+ */
+ def isEmpty(): Boolean = rdd.isEmpty()
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5118e2b..97012c7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1176,6 +1176,12 @@ abstract class RDD[T: ClassTag](
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
/**
+ * @return true if and only if the RDD contains no elements at all. Note that an RDD
+ * may be empty even when it has at least 1 partition.
+ */
+ def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 07b1e44..004de05 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -607,6 +607,27 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void isEmpty() {
+ Assert.assertTrue(sc.emptyRDD().isEmpty());
+ Assert.assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
+ Assert.assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
+ Assert.assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
+ new Function<Integer,Boolean>() {
+ @Override
+ public Boolean call(Integer i) {
+ return i < 0;
+ }
+ }).isEmpty());
+ Assert.assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
+ new Function<Integer, Boolean>() {
+ @Override
+ public Boolean call(Integer i) {
+ return i > 1;
+ }
+ }).isEmpty());
+ }
+
+ @Test
public void cartesian() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));
http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 0deb9b1..381ee2d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -52,6 +52,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
+ assert(!nums.isEmpty())
assert(nums.max() === 4)
assert(nums.min() === 1)
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
@@ -545,6 +546,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sortedTopK === nums.sorted(ord).take(5))
}
+ test("isEmpty") {
+ assert(sc.emptyRDD.isEmpty())
+ assert(sc.parallelize(Seq[Int]()).isEmpty())
+ assert(!sc.parallelize(Seq(1)).isEmpty())
+ assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty())
+ assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty())
+ }
+
test("sample preserves partitioner") {
val partitioner = new HashPartitioner(2)
val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner)
http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 0ccbfcb..95fef23 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -82,6 +82,10 @@ object MimaExcludes {
// SPARK-5166 Spark SQL API stabilization
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit")
+ ) ++ Seq(
+ // SPARK-5270
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.isEmpty")
)
case v if v.startsWith("1.2") =>
http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index c1120cf..4977400 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1130,6 +1130,18 @@ class RDD(object):
return rs[0]
raise ValueError("RDD is empty")
+ def isEmpty(self):
+ """
+ Returns true if and only if the RDD contains no elements at all. Note that an RDD
+ may be empty even when it has at least 1 partition.
+
+ >>> sc.parallelize([]).isEmpty()
+ True
+ >>> sc.parallelize([1]).isEmpty()
+ False
+ """
+ return self._jrdd.partitions().size() == 0 or len(self.take(1)) == 0
+
def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org