You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/07/29 10:02:25 UTC
git commit: [SPARK-791] [PySpark] fix pickle itemgetter with
cloudpickle
Repository: spark
Updated Branches:
refs/heads/master ccd5ab5f8 -> 92ef02626
[SPARK-791] [PySpark] fix pickle itemgetter with cloudpickle
fix the problem with pickle operator.itemgetter with multiple index.
Author: Davies Liu <da...@gmail.com>
Closes #1627 from davies/itemgetter and squashes the following commits:
aabd7fa [Davies Liu] fix pickle itemgetter with cloudpickle
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92ef0262
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92ef0262
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92ef0262
Branch: refs/heads/master
Commit: 92ef02626e793ea853cced4cbfee316f0b748ed7
Parents: ccd5ab5
Author: Davies Liu <da...@gmail.com>
Authored: Tue Jul 29 01:02:18 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Tue Jul 29 01:02:18 2014 -0700
----------------------------------------------------------------------
python/pyspark/cloudpickle.py | 5 +++--
python/pyspark/tests.py | 6 ++++++
2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/92ef0262/python/pyspark/cloudpickle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 4fda2a9..6806248 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -560,8 +560,9 @@ class CloudPickler(pickle.Pickler):
]
- itemgetter_obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
- return self.save_reduce(operator.itemgetter, (itemgetter_obj.item,))
+ obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
+ return self.save_reduce(operator.itemgetter,
+ obj.item if obj.nitems > 1 else (obj.item,))
if PyObject_HEAD:
dispatch[operator.itemgetter] = save_itemgetter
http://git-wip-us.apache.org/repos/asf/spark/blob/92ef0262/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 6dee7dc..8486c85 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -284,6 +284,12 @@ class TestRDDFunctions(PySparkTestCase):
self.assertEqual(set([2]), sets[3])
self.assertEqual(set([1, 3]), sets[5])
+ def test_itemgetter(self):
+ rdd = self.sc.parallelize([range(10)])
+ from operator import itemgetter
+ self.assertEqual([1], rdd.map(itemgetter(1)).collect())
+ self.assertEqual([(2, 3)], rdd.map(itemgetter(2, 3)).collect())
+
class TestIO(PySparkTestCase):