You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/03/10 21:27:05 UTC

git commit: SPARK-977 Added Python RDD.zip function

Repository: spark
Updated Branches:
  refs/heads/master 5d98cfc1c -> e1e09e0ef


SPARK-977 Added Python RDD.zip function

was raised earlier as a part of  apache/incubator-spark#486

Author: Prabin Banka <pr...@imaginea.com>

Closes #76 from prabinb/python-api-zip and squashes the following commits:

b1a31a0 [Prabin Banka] Added Python RDD.zip function


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1e09e0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1e09e0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1e09e0e

Branch: refs/heads/master
Commit: e1e09e0ef6b18e034727403d81747d899b042219
Parents: 5d98cfc
Author: Prabin Banka <pr...@imaginea.com>
Authored: Mon Mar 10 13:27:00 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Mar 10 13:27:00 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py         | 20 +++++++++++++++++++-
 python/pyspark/serializers.py | 29 ++++++++++++++++++++++++++++-
 2 files changed, 47 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1e09e0e/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index e72f57d..5ab27ff 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -30,7 +30,7 @@ from threading import Thread
 import warnings
 
 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
-    BatchedSerializer, CloudPickleSerializer, pack_long
+    BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
 from pyspark.join import python_join, python_left_outer_join, \
     python_right_outer_join, python_cogroup
 from pyspark.statcounter import StatCounter
@@ -1081,6 +1081,24 @@ class RDD(object):
         jrdd = self._jrdd.coalesce(numPartitions)
         return RDD(jrdd, self.ctx, self._jrdd_deserializer)
 
+    def zip(self, other):
+        """
+        Zips this RDD with another one, returning key-value pairs with the first element in each RDD
+        second element in each RDD, etc. Assumes that the two RDDs have the same number of
+        partitions and the same number of elements in each partition (e.g. one was made through
+        a map on the other).
+
+        >>> x = sc.parallelize(range(0,5))
+        >>> y = sc.parallelize(range(1000, 1005))
+        >>> x.zip(y).collect()
+        [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
+        """
+        pairRDD = self._jrdd.zip(other._jrdd)
+        deserializer = PairDeserializer(self._jrdd_deserializer,
+                                             other._jrdd_deserializer)
+        return RDD(pairRDD, self.ctx, deserializer)
+
+
     # TODO: `lookup` is disabled because we can't make direct comparisons based
     # on the key; we need to compare the hash of the key to the hash of the
     # keys in the pairs.  This could be an expensive operation, since those

http://git-wip-us.apache.org/repos/asf/spark/blob/e1e09e0e/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 8c6ad79..12c63f1 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -204,7 +204,7 @@ class CartesianDeserializer(FramedSerializer):
         self.key_ser = key_ser
         self.val_ser = val_ser
 
-    def load_stream(self, stream):
+    def prepare_keys_values(self, stream):
         key_stream = self.key_ser._load_stream_without_unbatching(stream)
         val_stream = self.val_ser._load_stream_without_unbatching(stream)
         key_is_batched = isinstance(self.key_ser, BatchedSerializer)
@@ -212,6 +212,10 @@ class CartesianDeserializer(FramedSerializer):
         for (keys, vals) in izip(key_stream, val_stream):
             keys = keys if key_is_batched else [keys]
             vals = vals if val_is_batched else [vals]
+            yield (keys, vals)
+
+    def load_stream(self, stream):
+        for (keys, vals) in self.prepare_keys_values(stream):
             for pair in product(keys, vals):
                 yield pair
 
@@ -224,6 +228,29 @@ class CartesianDeserializer(FramedSerializer):
                (str(self.key_ser), str(self.val_ser))
 
 
+class PairDeserializer(CartesianDeserializer):
+    """
+    Deserializes the JavaRDD zip() of two PythonRDDs.
+    """
+
+    def __init__(self, key_ser, val_ser):
+        self.key_ser = key_ser
+        self.val_ser = val_ser
+
+    def load_stream(self, stream):
+        for (keys, vals) in self.prepare_keys_values(stream):
+            for pair in izip(keys, vals):
+                yield pair
+
+    def __eq__(self, other):
+        return isinstance(other, PairDeserializer) and \
+               self.key_ser == other.key_ser and self.val_ser == other.val_ser
+
+    def __str__(self):
+        return "PairDeserializer<%s, %s>" % \
+               (str(self.key_ser), str(self.val_ser))
+
+
 class NoOpSerializer(FramedSerializer):
 
     def loads(self, obj): return obj