You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Davies Liu (JIRA)" <ji...@apache.org> on 2015/09/15 04:50:45 UTC

[jira] [Resolved] (SPARK-10542) The PySpark 1.5 closure serializer can't serialize a namedtuple instance.

     [ https://issues.apache.org/jira/browse/SPARK-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Davies Liu resolved SPARK-10542.
--------------------------------
          Resolution: Fixed
       Fix Version/s: 1.5.1
                      1.6.0
    Target Version/s: 1.6.0, 1.5.1  (was: 1.5.1)

> The  PySpark 1.5 closure serializer can't serialize a namedtuple instance.
> --------------------------------------------------------------------------
>
>                 Key: SPARK-10542
>                 URL: https://issues.apache.org/jira/browse/SPARK-10542
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.0
>            Reporter: Davies Liu
>            Assignee: Davies Liu
>            Priority: Critical
>             Fix For: 1.6.0, 1.5.1
>
>
> Code to Reproduce Bug:
> {code}
> from collections import namedtuple
> PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
> rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0))
> rdd.count()
> {code}
> Error message on Spark 1.5:
> {code}
> AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
> ---------------------------------------------------------------------------
> AttributeError                            Traceback (most recent call last)
> <ipython-input-5-59448e31019f> in <module>()
>       2 PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
>       3 rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0))
> ----> 4 rdd.count()
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in count(self)
>    1004         3
>    1005         """
> -> 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>    1007 
>    1008     def stats(self):
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in sum(self)
>     995         6.0
>     996         """
> --> 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>     998 
>     999     def count(self):
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
>     869         # zeroValue provided to each partition is unique from the one provided
>     870         # to the final reduce call
> --> 871         vals = self.mapPartitions(func).collect()
>     872         return reduce(op, vals, zeroValue)
>     873 
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in collect(self)
>     771         """
>     772         with SCCallSiteSync(self.context) as css:
> --> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>     774         return list(_load_from_socket(port, self._jrdd_deserializer))
>     775 
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
>    2383         command = (self.func, profiler, self._prev_jrdd_deserializer,
>    2384                    self._jrdd_deserializer)
> -> 2385         pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
>    2386         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
>    2387                                              bytearray(pickled_cmd),
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
>    2303     # the serialized command will be compressed by broadcast
>    2304     ser = CloudPickleSerializer()
> -> 2305     pickled_command = ser.dumps(command)
>    2306     if len(pickled_command) > (1 << 20):  # 1M
>    2307         # The broadcast will have same life cycle as created PythonRDD
> /home/ubuntu/databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
>     425 
>     426     def dumps(self, obj):
> --> 427         return cloudpickle.dumps(obj, 2)
>     428 
>     429 
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
>     639 
>     640     cp = CloudPickler(file,protocol)
> --> 641     cp.dump(obj)
>     642 
>     643     return file.getvalue()
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
>     105         self.inject_addons()
>     106         try:
> --> 107             return Pickler.dump(self, obj)
>     108         except RuntimeError as e:
>     109             if 'recursion' in e.args[0]:
> /usr/lib/python2.7/pickle.pyc in dump(self, obj)
>     222         if self.proto >= 2:
>     223             self.write(PROTO + chr(self.proto))
> --> 224         self.save(obj)
>     225         self.write(STOP)
>     226 
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
>     560         write(MARK)
>     561         for element in obj:
> --> 562             save(element)
>     563 
>     564         if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> ... skipped 23125 bytes ...
>     650 
>     651     dispatch[DictionaryType] = save_dict
> /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
>     684                 k, v = tmp[0]
>     685                 save(k)
> --> 686                 save(v)
>     687                 write(SETITEM)
>     688             # else tmp is empty, and we're done
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_global(self, obj, name, pack)
>     367                     v = v.__func__
>     368                 dd[k] = v
> --> 369             self.save(dd)
>     370             self.write(pickle.TUPLE2)
>     371             self.write(pickle.REDUCE)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_dict(self, obj)
>     647 
>     648         self.memoize(obj)
> --> 649         self._batch_setitems(obj.iteritems())
>     650 
>     651     dispatch[DictionaryType] = save_dict
> /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
>     679                 for k, v in tmp:
>     680                     save(k)
> --> 681                     save(v)
>     682                 write(SETITEMS)
>     683             elif n:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
>     191         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
>     192             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
> --> 193             self.save_function_tuple(obj)
>     194             return
>     195         else:
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
>     240         # save the rest of the func data needed by _fill_function
>     241         save(f_globals)
> --> 242         save(defaults)
>     243         save(dct)
>     244         write(pickle.TUPLE)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
>     546         if n <= 3 and proto >= 2:
>     547             for element in obj:
> --> 548                 save(element)
>     549             # Subtle.  Same as in the big comment below.
>     550             if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_builtin_function(self, obj)
>     313         if obj.__module__ is "__builtin__":
>     314             return self.save_global(obj)
> --> 315         return self.save_function(obj)
>     316     dispatch[types.BuiltinFunctionType] = save_builtin_function
>     317 
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
>     189         # we'll pickle the actual function object rather than simply saving a
>     190         # reference (as is done in default pickler), via save_function_tuple.
> --> 191         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
>     192             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
>     193             self.save_function_tuple(obj)
> AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org