You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/17 06:23:38 UTC
[1/4] git commit: Updated link for pyspark examples in docs
Repository: spark
Updated Branches:
refs/heads/branch-0.9 ef74e44e0 -> 1dc1e988f
Updated link for pyspark examples in docs
Author: Jyotiska NK <jy...@gmail.com>
Closes #22 from jyotiska/pyspark_docs and squashes the following commits:
426136c [Jyotiska NK] Updated link for pyspark examples
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e74e79a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e74e79a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e74e79a0
Branch: refs/heads/branch-0.9
Commit: e74e79a06fc17a4ea71c882de2549bbd4dbf76c5
Parents: ef74e44
Author: Jyotiska NK <jy...@gmail.com>
Authored: Wed Feb 26 21:37:04 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:12:51 2014 -0700
----------------------------------------------------------------------
docs/python-programming-guide.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e74e79a0/docs/python-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 7c5283f..57ed54c 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -157,7 +157,7 @@ some example applications.
# Where to Go from Here
-PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples).
+PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples).
You can run them by passing the files to `pyspark`; e.g.:
./bin/pyspark python/examples/wordcount.py
[3/4] git commit: Spark-1163, Added missing Python RDD functions
Posted by pw...@apache.org.
Spark-1163, Added missing Python RDD functions
Author: prabinb <pr...@imaginea.com>
Closes #92 from prabinb/python-api-rdd and squashes the following commits:
51129ca [prabinb] Added missing Python RDD functions Added __repr__ function to StorageLevel class. Added doctest for RDD.getStorageLevel().
Conflicts:
python/pyspark/rdd.py
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/249930ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/249930ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/249930ae
Branch: refs/heads/branch-0.9
Commit: 249930aeaca6cfa7592e77fc0472069c1a63fdb6
Parents: 4480505
Author: prabinb <pr...@imaginea.com>
Authored: Tue Mar 11 23:57:05 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:14:53 2014 -0700
----------------------------------------------------------------------
python/pyspark/rdd.py | 90 +++++++++++++++++++++++++++++++++++++
python/pyspark/storagelevel.py | 4 ++
2 files changed, 94 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/249930ae/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 487bfb1..0fdb9e6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,6 +36,7 @@ from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler
+from pyspark.storagelevel import StorageLevel
from py4j.java_collections import ListConverter, MapConverter
@@ -1026,6 +1027,95 @@ class RDD(object):
"""
return self.map(lambda x: (f(x), x))
+ def repartition(self, numPartitions):
+ """
+ Return a new RDD that has exactly numPartitions partitions.
+
+ Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+ a shuffle to redistribute data.
+ If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+ which can avoid performing a shuffle.
+ >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
+ >>> sorted(rdd.glom().collect())
+ [[1], [2, 3], [4, 5], [6, 7]]
+ >>> len(rdd.repartition(2).glom().collect())
+ 2
+ >>> len(rdd.repartition(10).glom().collect())
+ 10
+ """
+ jrdd = self._jrdd.repartition(numPartitions)
+ return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+ def coalesce(self, numPartitions, shuffle=False):
+ """
+ Return a new RDD that is reduced into `numPartitions` partitions.
+ >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
+ [[1], [2, 3], [4, 5]]
+ >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
+ [[1, 2, 3, 4, 5]]
+ """
+ jrdd = self._jrdd.coalesce(numPartitions)
+ return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+
+ def zip(self, other):
+ """
+ Zips this RDD with another one, returning key-value pairs with the first element in each RDD
+ second element in each RDD, etc. Assumes that the two RDDs have the same number of
+ partitions and the same number of elements in each partition (e.g. one was made through
+ a map on the other).
+
+ >>> x = sc.parallelize(range(0,5))
+ >>> y = sc.parallelize(range(1000, 1005))
+ >>> x.zip(y).collect()
+ [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
+ """
+ pairRDD = self._jrdd.zip(other._jrdd)
+ deserializer = PairDeserializer(self._jrdd_deserializer,
+ other._jrdd_deserializer)
+ return RDD(pairRDD, self.ctx, deserializer)
+
+ def name(self):
+ """
+ Return the name of this RDD.
+ """
+ name_ = self._jrdd.name()
+ if not name_:
+ return None
+ return name_.encode('utf-8')
+
+ def setName(self, name):
+ """
+ Assign a name to this RDD.
+ >>> rdd1 = sc.parallelize([1,2])
+ >>> rdd1.setName('RDD1')
+ >>> rdd1.name()
+ 'RDD1'
+ """
+ self._jrdd.setName(name)
+
+ def toDebugString(self):
+ """
+ A description of this RDD and its recursive dependencies for debugging.
+ """
+ debug_string = self._jrdd.toDebugString()
+ if not debug_string:
+ return None
+ return debug_string.encode('utf-8')
+
+ def getStorageLevel(self):
+ """
+ Get the RDD's current storage level.
+ >>> rdd1 = sc.parallelize([1,2])
+ >>> rdd1.getStorageLevel()
+ StorageLevel(False, False, False, 1)
+ """
+ java_storage_level = self._jrdd.getStorageLevel()
+ storage_level = StorageLevel(java_storage_level.useDisk(),
+ java_storage_level.useMemory(),
+ java_storage_level.deserialized(),
+ java_storage_level.replication())
+ return storage_level
+
# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
http://git-wip-us.apache.org/repos/asf/spark/blob/249930ae/python/pyspark/storagelevel.py
----------------------------------------------------------------------
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index b31f476..c3e3a44 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -31,6 +31,10 @@ class StorageLevel:
self.deserialized = deserialized
self.replication = replication
+ def __repr__(self):
+ return "StorageLevel(%s, %s, %s, %s)" % (
+ self.useDisk, self.useMemory, self.deserialized, self.replication)
+
StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)
[4/4] git commit: SPARK-977 Added Python RDD.zip function
Posted by pw...@apache.org.
SPARK-977 Added Python RDD.zip function
was raised earlier as a part of apache/incubator-spark#486
Author: Prabin Banka <pr...@imaginea.com>
Closes #76 from prabinb/python-api-zip and squashes the following commits:
b1a31a0 [Prabin Banka] Added Python RDD.zip function
Conflicts:
python/pyspark/rdd.py
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dc1e988
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dc1e988
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dc1e988
Branch: refs/heads/branch-0.9
Commit: 1dc1e988fd379bce8f661091bd8224724a7345a3
Parents: 249930a
Author: Prabin Banka <pr...@imaginea.com>
Authored: Mon Mar 10 13:27:00 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:16:17 2014 -0700
----------------------------------------------------------------------
python/pyspark/rdd.py | 2 +-
python/pyspark/serializers.py | 29 ++++++++++++++++++++++++++++-
2 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1dc1e988/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 0fdb9e6..c29cefa 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -31,7 +31,7 @@ import warnings
from heapq import heappush, heappop, heappushpop
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
- BatchedSerializer, CloudPickleSerializer, pack_long
+ BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
http://git-wip-us.apache.org/repos/asf/spark/blob/1dc1e988/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 8c6ad79..12c63f1 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -204,7 +204,7 @@ class CartesianDeserializer(FramedSerializer):
self.key_ser = key_ser
self.val_ser = val_ser
- def load_stream(self, stream):
+ def prepare_keys_values(self, stream):
key_stream = self.key_ser._load_stream_without_unbatching(stream)
val_stream = self.val_ser._load_stream_without_unbatching(stream)
key_is_batched = isinstance(self.key_ser, BatchedSerializer)
@@ -212,6 +212,10 @@ class CartesianDeserializer(FramedSerializer):
for (keys, vals) in izip(key_stream, val_stream):
keys = keys if key_is_batched else [keys]
vals = vals if val_is_batched else [vals]
+ yield (keys, vals)
+
+ def load_stream(self, stream):
+ for (keys, vals) in self.prepare_keys_values(stream):
for pair in product(keys, vals):
yield pair
@@ -224,6 +228,29 @@ class CartesianDeserializer(FramedSerializer):
(str(self.key_ser), str(self.val_ser))
+class PairDeserializer(CartesianDeserializer):
+ """
+ Deserializes the JavaRDD zip() of two PythonRDDs.
+ """
+
+ def __init__(self, key_ser, val_ser):
+ self.key_ser = key_ser
+ self.val_ser = val_ser
+
+ def load_stream(self, stream):
+ for (keys, vals) in self.prepare_keys_values(stream):
+ for pair in izip(keys, vals):
+ yield pair
+
+ def __eq__(self, other):
+ return isinstance(other, PairDeserializer) and \
+ self.key_ser == other.key_ser and self.val_ser == other.val_ser
+
+ def __str__(self):
+ return "PairDeserializer<%s, %s>" % \
+ (str(self.key_ser), str(self.val_ser))
+
+
class NoOpSerializer(FramedSerializer):
def loads(self, obj): return obj
[2/4] git commit: SPARK-1168, Added foldByKey to pyspark.
Posted by pw...@apache.org.
SPARK-1168, Added foldByKey to pyspark.
Author: Prashant Sharma <pr...@imaginea.com>
Closes #115 from ScrapCodes/SPARK-1168/pyspark-foldByKey and squashes the following commits:
db6f67e [Prashant Sharma] SPARK-1168, Added foldByKey to pyspark.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44805058
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44805058
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44805058
Branch: refs/heads/branch-0.9
Commit: 44805058fc054f5d2f05a763bb3735c5b1afefb8
Parents: e74e79a
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Mon Mar 10 13:37:11 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 16 22:13:33 2014 -0700
----------------------------------------------------------------------
python/pyspark/rdd.py | 14 ++++++++++++++
1 file changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/44805058/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 678b005..487bfb1 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -910,7 +910,21 @@ class RDD(object):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)
+
+ def foldByKey(self, zeroValue, func, numPartitions=None):
+ """
+ Merge the values for each key using an associative function "func" and a neutral "zeroValue"
+ which may be added to the result an arbitrary number of times, and must not change
+ the result (e.g., 0 for addition, or 1 for multiplication.).
+ >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
+ >>> from operator import add
+ >>> rdd.foldByKey(0, add).collect()
+ [('a', 2), ('b', 1)]
+ """
+ return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
+
+
# TODO: support variant with custom partitioner
def groupByKey(self, numPartitions=None):
"""