You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mark Baker (JIRA)" <ji...@apache.org> on 2014/06/18 05:54:04 UTC

[jira] [Commented] (SPARK-791) [pyspark] operator.getattr not serialized

    [ https://issues.apache.org/jira/browse/SPARK-791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14034779#comment-14034779 ] 

Mark Baker commented on SPARK-791:
----------------------------------

I began porting Pyspark to Python 3, but with my modest Python-fu, hit a wall at cloudpickle. Dill supports Python 3, so seems like a big win in that direction too.

> [pyspark] operator.getattr not serialized
> -----------------------------------------
>
>                 Key: SPARK-791
>                 URL: https://issues.apache.org/jira/browse/SPARK-791
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 0.7.2, 0.9.0
>            Reporter: Jim Blomo
>            Priority: Minor
>
> Using operator.itemgetter as a function in map seems to confuse the serialization process in pyspark.  I'm using itemgetter to return tuples, which fails with a TypeError (details below).  Using an equivalent lambda function returns the correct result.
> Use a test file:
> {code:sh}
> echo 1,1 > test.txt
> {code}
> Then try mapping it to a tuple:
> {code:python}
> import csv
> sc.textFile("test.txt").mapPartitions(csv.reader).map(lambda l: (l[0],l[1])).first()
> Out[7]: ('1', '1')
> {code}
> But this does not work when using operator.itemgetter:
> {code:python}
> import operator
> sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first()
> # TypeError: list indices must be integers, not tuple
> {code}
> This is running with git master, commit 6d60fe571a405eb9306a2be1817901316a46f892
> IPython 0.13.2 
> java version "1.7.0_25"
> Scala code runner version 2.9.1 
> Ubuntu 12.04
> Full debug output:
> {code:python}
> In [9]: sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first()
> 13/07/04 16:19:49 INFO storage.MemoryStore: ensureFreeSpace(33632) called with curMem=201792, maxMem=339585269
> 13/07/04 16:19:49 INFO storage.MemoryStore: Block broadcast_6 stored as values to memory (estimated size 32.8 KB, free 323.6 MB)
> 13/07/04 16:19:49 INFO mapred.FileInputFormat: Total input paths to process : 1
> 13/07/04 16:19:49 INFO spark.SparkContext: Starting job: takePartition at NativeMethodAccessorImpl.java:-2
> 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Got job 4 (takePartition at NativeMethodAccessorImpl.java:-2) with 1 output partitions (allowLocal=true)
> 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Final stage: Stage 4 (PythonRDD at NativeConstructorAccessorImpl.java:-2)
> 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Parents of final stage: List()
> 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Missing parents: List()
> 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Computing the requested partition locally
> 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Failed to run takePartition at NativeMethodAccessorImpl.java:-2
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-9-1fdb3e7a8ac7> in <module>()
> ----> 1 sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first()
> /home/jim/src/spark/python/pyspark/rdd.pyc in first(self)
>     389         2
>     390         """
> --> 391         return self.take(1)[0]
>     392 
>     393     def saveAsTextFile(self, path):
> /home/jim/src/spark/python/pyspark/rdd.pyc in take(self, num)
>     372         items = []
>     373         for partition in range(self._jrdd.splits().size()):
> --> 374             iterator = self.ctx._takePartition(self._jrdd.rdd(), partition)
>     375             # Each item in the iterator is a string, Python object, batch of
>     376             # Python objects.  Regardless, it is sufficient to take `num`
> /home/jim/src/spark/python/lib/py4j0.7.egg/py4j/java_gateway.pyc in __call__(self, *args)
>     498         answer = self.gateway_client.send_command(command)
>     499         return_value = get_return_value(answer, self.gateway_client,
> --> 500                 self.target_id, self.name)
>     501 
>     502         for temp_arg in temp_args:
> /home/jim/src/spark/python/lib/py4j0.7.egg/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling z:spark.api.python.PythonRDD.takePartition.
> : spark.api.python.PythonException: Traceback (most recent call last):
>   File "/home/jim/src/spark/python/pyspark/worker.py", line 53, in main
>     for obj in func(split_index, iterator):
>   File "/home/jim/src/spark/python/pyspark/serializers.py", line 24, in batched
>     for item in iterator:
> TypeError: list indices must be integers, not tuple
> 	at spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:117)
> 	at spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:139)
> 	at spark.api.python.PythonRDD.compute(PythonRDD.scala:82)
> 	at spark.RDD.computeOrReadCheckpoint(RDD.scala:232)
> 	at spark.RDD.iterator(RDD.scala:221)
> 	at spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:423)
> 	at spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:410)
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)