You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/17 22:59:44 UTC

spark git commit: [SPARK-8373] [PYSPARK] Add emptyRDD to pyspark and fix the issue when calling sum on an empty RDD

Repository: spark
Updated Branches:
  refs/heads/master 2837e0670 -> 0fc4b96f3


[SPARK-8373] [PYSPARK] Add emptyRDD to pyspark and fix the issue when calling sum on an empty RDD

This PR fixes the sum issue and also adds `emptyRDD` so that it's easy to create a test case.

Author: zsxwing <zs...@gmail.com>

Closes #6826 from zsxwing/python-emptyRDD and squashes the following commits:

b36993f [zsxwing] Update the return type to JavaRDD[T]
71df047 [zsxwing] Add emptyRDD to pyspark and fix the issue when calling sum on an empty RDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fc4b96f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fc4b96f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fc4b96f

Branch: refs/heads/master
Commit: 0fc4b96f3e3bf81724ac133a6acc97c1b77271b4
Parents: 2837e06
Author: zsxwing <zs...@gmail.com>
Authored: Wed Jun 17 13:59:39 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Jun 17 13:59:39 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala   | 5 +++++
 python/pyspark/context.py                                    | 6 ++++++
 python/pyspark/rdd.py                                        | 2 +-
 python/pyspark/tests.py                                      | 8 ++++++++
 4 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0fc4b96f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 55a37f8..0103f6c 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -425,6 +425,11 @@ private[spark] object PythonRDD extends Logging {
     iter.foreach(write)
   }
 
+  /** Create an RDD that has no partitions or elements. */
+  def emptyRDD[T](sc: JavaSparkContext): JavaRDD[T] = {
+    sc.emptyRDD[T]
+  }
+
   /**
    * Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]],
    * key and value class.

http://git-wip-us.apache.org/repos/asf/spark/blob/0fc4b96f/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 44d90f1..90b2fff 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -324,6 +324,12 @@ class SparkContext(object):
         with SparkContext._lock:
             SparkContext._active_spark_context = None
 
+    def emptyRDD(self):
+        """
+        Create an RDD that has no partitions or elements.
+        """
+        return RDD(self._jsc.emptyRDD(), self, NoOpSerializer())
+
     def range(self, start, end=None, step=1, numSlices=None):
         """
         Create a new RDD of int containing elements from `start` to `end`

http://git-wip-us.apache.org/repos/asf/spark/blob/0fc4b96f/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 98a8ff8..20c0bc9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -960,7 +960,7 @@ class RDD(object):
         >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
         6.0
         """
-        return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
+        return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
 
     def count(self):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/0fc4b96f/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index f9fb37f..11b402e 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -458,6 +458,14 @@ class RDDTests(ReusedPySparkTestCase):
         self.assertEqual(id + 1, id2)
         self.assertEqual(id2, rdd2.id())
 
+    def test_empty_rdd(self):
+        rdd = self.sc.emptyRDD()
+        self.assertTrue(rdd.isEmpty())
+
+    def test_sum(self):
+        self.assertEqual(0, self.sc.emptyRDD().sum())
+        self.assertEqual(6, self.sc.parallelize([1, 2, 3]).sum())
+
     def test_save_as_textfile_with_unicode(self):
         # Regression test for SPARK-970
         x = u"\u00A1Hola, mundo!"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org