You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Davies Liu (JIRA)" <ji...@apache.org> on 2015/09/15 04:50:45 UTC
[jira] [Resolved] (SPARK-10542) The PySpark 1.5 closure serializer
can't serialize a namedtuple instance.
[ https://issues.apache.org/jira/browse/SPARK-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Davies Liu resolved SPARK-10542.
--------------------------------
Resolution: Fixed
Fix Version/s: 1.5.1
1.6.0
Target Version/s: 1.6.0, 1.5.1 (was: 1.5.1)
> The PySpark 1.5 closure serializer can't serialize a namedtuple instance.
> --------------------------------------------------------------------------
>
> Key: SPARK-10542
> URL: https://issues.apache.org/jira/browse/SPARK-10542
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 1.5.0
> Reporter: Davies Liu
> Assignee: Davies Liu
> Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> Code to Reproduce Bug:
> {code}
> from collections import namedtuple
> PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
> rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0))
> rdd.count()
> {code}
> Error message on Spark 1.5:
> {code}
> AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
> ---------------------------------------------------------------------------
> AttributeError Traceback (most recent call last)
> <ipython-input-5-59448e31019f> in <module>()
> 2 PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
> 3 rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0))
> ----> 4 rdd.count()
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in count(self)
> 1004 3
> 1005 """
> -> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> 1007
> 1008 def stats(self):
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in sum(self)
> 995 6.0
> 996 """
> --> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
> 998
> 999 def count(self):
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
> 869 # zeroValue provided to each partition is unique from the one provided
> 870 # to the final reduce call
> --> 871 vals = self.mapPartitions(func).collect()
> 872 return reduce(op, vals, zeroValue)
> 873
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in collect(self)
> 771 """
> 772 with SCCallSiteSync(self.context) as css:
> --> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
> 774 return list(_load_from_socket(port, self._jrdd_deserializer))
> 775
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
> 2383 command = (self.func, profiler, self._prev_jrdd_deserializer,
> 2384 self._jrdd_deserializer)
> -> 2385 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
> 2386 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
> 2387 bytearray(pickled_cmd),
> /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
> 2303 # the serialized command will be compressed by broadcast
> 2304 ser = CloudPickleSerializer()
> -> 2305 pickled_command = ser.dumps(command)
> 2306 if len(pickled_command) > (1 << 20): # 1M
> 2307 # The broadcast will have same life cycle as created PythonRDD
> /home/ubuntu/databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
> 425
> 426 def dumps(self, obj):
> --> 427 return cloudpickle.dumps(obj, 2)
> 428
> 429
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
> 639
> 640 cp = CloudPickler(file,protocol)
> --> 641 cp.dump(obj)
> 642
> 643 return file.getvalue()
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
> 105 self.inject_addons()
> 106 try:
> --> 107 return Pickler.dump(self, obj)
> 108 except RuntimeError as e:
> 109 if 'recursion' in e.args[0]:
> /usr/lib/python2.7/pickle.pyc in dump(self, obj)
> 222 if self.proto >= 2:
> 223 self.write(PROTO + chr(self.proto))
> --> 224 self.save(obj)
> 225 self.write(STOP)
> 226
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
> 560 write(MARK)
> 561 for element in obj:
> --> 562 save(element)
> 563
> 564 if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> ... skipped 23125 bytes ...
> 650
> 651 dispatch[DictionaryType] = save_dict
> /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
> 684 k, v = tmp[0]
> 685 save(k)
> --> 686 save(v)
> 687 write(SETITEM)
> 688 # else tmp is empty, and we're done
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_global(self, obj, name, pack)
> 367 v = v.__func__
> 368 dd[k] = v
> --> 369 self.save(dd)
> 370 self.write(pickle.TUPLE2)
> 371 self.write(pickle.REDUCE)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_dict(self, obj)
> 647
> 648 self.memoize(obj)
> --> 649 self._batch_setitems(obj.iteritems())
> 650
> 651 dispatch[DictionaryType] = save_dict
> /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
> 679 for k, v in tmp:
> 680 save(k)
> --> 681 save(v)
> 682 write(SETITEMS)
> 683 elif n:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
> 191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
> 192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
> --> 193 self.save_function_tuple(obj)
> 194 return
> 195 else:
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
> 240 # save the rest of the func data needed by _fill_function
> 241 save(f_globals)
> --> 242 save(defaults)
> 243 save(dct)
> 244 write(pickle.TUPLE)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
> 546 if n <= 3 and proto >= 2:
> 547 for element in obj:
> --> 548 save(element)
> 549 # Subtle. Same as in the big comment below.
> 550 if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_builtin_function(self, obj)
> 313 if obj.__module__ is "__builtin__":
> 314 return self.save_global(obj)
> --> 315 return self.save_function(obj)
> 316 dispatch[types.BuiltinFunctionType] = save_builtin_function
> 317
> /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
> 189 # we'll pickle the actual function object rather than simply saving a
> 190 # reference (as is done in default pickler), via save_function_tuple.
> --> 191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
> 192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
> 193 self.save_function_tuple(obj)
> AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org