You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/01/20 07:51:00 UTC

spark git commit: SPARK-5270 [CORE] Provide isEmpty() function in RDD API

Repository: spark
Updated Branches:
  refs/heads/master e69fb8c75 -> 306ff187a


SPARK-5270 [CORE] Provide isEmpty() function in RDD API

Pretty minor, but submitted for consideration -- this would at least help people make this check in the most efficient way I know.

Author: Sean Owen <so...@cloudera.com>

Closes #4074 from srowen/SPARK-5270 and squashes the following commits:

66885b8 [Sean Owen] Add note that JavaRDDLike should not be implemented by user code
2e9b490 [Sean Owen] More tests, and Mima-exclude the new isEmpty method in JavaRDDLike
28395ff [Sean Owen] Add isEmpty to Java, Python
7dd04b7 [Sean Owen] Add efficient RDD.isEmpty()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/306ff187
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/306ff187
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/306ff187

Branch: refs/heads/master
Commit: 306ff187af0c49f61f4bc1850021561397b4f8f1
Parents: e69fb8c
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Jan 19 22:50:44 2015 -0800
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Mon Jan 19 22:50:45 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 10 ++++++++++
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  6 ++++++
 .../java/org/apache/spark/JavaAPISuite.java     | 21 ++++++++++++++++++++
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |  9 +++++++++
 project/MimaExcludes.scala                      |  4 ++++
 python/pyspark/rdd.py                           | 12 +++++++++++
 6 files changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index bd45163..62bf18d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -38,6 +38,10 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
+/**
+ * Defines operations common to several Java RDD implementations.
+ * Note that this trait is not intended to be implemented by user code.
+ */
 trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def wrapRDD(rdd: RDD[T]): This
 
@@ -436,6 +440,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def first(): T = rdd.first()
 
   /**
+   * @return true if and only if the RDD contains no elements at all. Note that an RDD
+   *         may be empty even when it has at least 1 partition.
+   */
+  def isEmpty(): Boolean = rdd.isEmpty()
+
+  /**
    * Save this RDD as a text file, using string representations of elements.
    */
   def saveAsTextFile(path: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5118e2b..97012c7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1176,6 +1176,12 @@ abstract class RDD[T: ClassTag](
   def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
 
   /**
+   * @return true if and only if the RDD contains no elements at all. Note that an RDD
+   *         may be empty even when it has at least 1 partition.
+   */
+  def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0
+
+  /**
    * Save this RDD as a text file, using string representations of elements.
    */
   def saveAsTextFile(path: String) {

http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 07b1e44..004de05 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -607,6 +607,27 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
+  public void isEmpty() {
+    Assert.assertTrue(sc.emptyRDD().isEmpty());
+    Assert.assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
+    Assert.assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
+    Assert.assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
+        new Function<Integer,Boolean>() {
+          @Override
+          public Boolean call(Integer i) {
+            return i < 0;
+          }
+        }).isEmpty());
+    Assert.assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
+        new Function<Integer, Boolean>() {
+          @Override
+          public Boolean call(Integer i) {
+            return i > 1;
+          }
+        }).isEmpty());
+  }
+
+  @Test
   public void cartesian() {
     JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
     JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World"));

http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 0deb9b1..381ee2d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -52,6 +52,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
     assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
     assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
+    assert(!nums.isEmpty())
     assert(nums.max() === 4)
     assert(nums.min() === 1)
     val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
@@ -545,6 +546,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(sortedTopK === nums.sorted(ord).take(5))
   }
 
+  test("isEmpty") {
+    assert(sc.emptyRDD.isEmpty())
+    assert(sc.parallelize(Seq[Int]()).isEmpty())
+    assert(!sc.parallelize(Seq(1)).isEmpty())
+    assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty())
+    assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty())
+  }
+
   test("sample preserves partitioner") {
     val partitioner = new HashPartitioner(2)
     val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner)

http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 0ccbfcb..95fef23 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -82,6 +82,10 @@ object MimaExcludes {
             // SPARK-5166 Spark SQL API stabilization
             ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"),
             ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit")
+          ) ++ Seq(
+            // SPARK-5270
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.isEmpty")
           )
 
         case v if v.startsWith("1.2") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/306ff187/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index c1120cf..4977400 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1130,6 +1130,18 @@ class RDD(object):
             return rs[0]
         raise ValueError("RDD is empty")
 
+    def isEmpty(self):
+        """
+        Returns true if and only if the RDD contains no elements at all. Note that an RDD
+        may be empty even when it has at least 1 partition.
+
+        >>> sc.parallelize([]).isEmpty()
+        True
+        >>> sc.parallelize([1]).isEmpty()
+        False
+        """
+        return self._jrdd.partitions().size() == 0 or len(self.take(1)) == 0
+
     def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
         """
         Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org