You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/24 05:53:36 UTC
git commit: Merge pull request #505 from JoshRosen/SPARK-1026
Updated Branches:
refs/heads/branch-0.9 e66d4c27c -> d0a105d4e
Merge pull request #505 from JoshRosen/SPARK-1026
Deprecate mapPartitionsWithSplit in PySpark (SPARK-1026)
This commit deprecates `mapPartitionsWithSplit` in PySpark (see [SPARK-1026](https://spark-project.atlassian.net/browse/SPARK-1026) and removes the remaining references to it from the docs.
(cherry picked from commit 05be7047744c88e64e7e6bd973f9bcfacd00da5f)
Signed-off-by: Patrick Wendell <pw...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d0a105d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d0a105d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d0a105d4
Branch: refs/heads/branch-0.9
Commit: d0a105d4e5ec3c84bc5ee8c8d55cca40f43cb9b8
Parents: e66d4c2
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 23 20:53:18 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 23 20:53:31 2014 -0800
----------------------------------------------------------------------
docs/scala-programming-guide.md | 4 ++--
python/pyspark/rdd.py | 25 +++++++++++++++++++++----
2 files changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0a105d4/docs/scala-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index c1ef46a..7c0f67b 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -168,9 +168,9 @@ The following tables list the transformations and actions currently supported (s
Iterator[T] => Iterator[U] when running on an RDD of type T. </td>
</tr>
<tr>
- <td> <b>mapPartitionsWithSplit</b>(<i>func</i>) </td>
+ <td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td>
<td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of
- the split, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
+ the partition, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
</td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d0a105d4/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6fb4a7b..1ad4b52 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -27,6 +27,7 @@ import traceback
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
+import warnings
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, pack_long
@@ -179,7 +180,7 @@ class RDD(object):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
- return self.mapPartitionsWithSplit(func, preservesPartitioning)
+ return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitions(self, f, preservesPartitioning=False):
"""
@@ -191,10 +192,24 @@ class RDD(object):
[3, 7]
"""
def func(s, iterator): return f(iterator)
- return self.mapPartitionsWithSplit(func)
+ return self.mapPartitionsWithIndex(func)
+
+ def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
+ """
+ Return a new RDD by applying a function to each partition of this RDD,
+ while tracking the index of the original partition.
+
+ >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
+ >>> def f(splitIndex, iterator): yield splitIndex
+ >>> rdd.mapPartitionsWithIndex(f).sum()
+ 6
+ """
+ return PipelinedRDD(self, f, preservesPartitioning)
def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
"""
+ Deprecated: use mapPartitionsWithIndex instead.
+
Return a new RDD by applying a function to each partition of this RDD,
while tracking the index of the original partition.
@@ -203,7 +218,9 @@ class RDD(object):
>>> rdd.mapPartitionsWithSplit(f).sum()
6
"""
- return PipelinedRDD(self, f, preservesPartitioning)
+ warnings.warn("mapPartitionsWithSplit is deprecated; "
+ "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
+ return self.mapPartitionsWithIndex(f, preservesPartitioning)
def filter(self, f):
"""
@@ -235,7 +252,7 @@ class RDD(object):
>>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
"""
- return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True)
+ return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed):