You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Maciej Bryński (JIRA)" <ji...@apache.org> on 2017/10/23 08:55:00 UTC

[jira] [Resolved] (SPARK-21439) Cannot use Spark with Python ABCmeta (exception from cloudpickle)

     [ https://issues.apache.org/jira/browse/SPARK-21439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Maciej Bryński resolved SPARK-21439.
------------------------------------
       Resolution: Duplicate
    Fix Version/s: 2.3.0

> Cannot use Spark with Python ABCmeta (exception from cloudpickle)
> -----------------------------------------------------------------
>
>                 Key: SPARK-21439
>                 URL: https://issues.apache.org/jira/browse/SPARK-21439
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 2.1.1
>            Reporter: Maciej Bryński
>             Fix For: 2.3.0
>
>
> I'm trying to use code with ABCMeta.
> This code gives exception as a result.
> {code}
> from abc import ABCMeta, abstractmethod
> class A(metaclass=ABCMeta):
>     @abstractmethod
>     def x(self):
>         """Abstract"""
>         
> class B(A):
>     def x(self):
>         return 10
> b = B()
> sc.range(10).map(lambda x: b.x()).collect()
> {code}
> Exception:
> {code}
> ---------------------------------------------------------------------------
> AttributeError                            Traceback (most recent call last)
> /opt/spark/python/pyspark/cloudpickle.py in dump(self, obj)
>     146         try:
> --> 147             return Pickler.dump(self, obj)
>     148         except RuntimeError as e:
> /usr/lib/python3.4/pickle.py in dump(self, obj)
>     409             self.framer.start_framing()
> --> 410         self.save(obj)
>     411         self.write(STOP)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
>     741         for element in obj:
> --> 742             save(element)
>     743 
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
>     253             if klass is None or klass is not obj:
> --> 254                 self.save_function_tuple(obj)
>     255                 return
> /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
>     290         save(_make_skel_func)
> --> 291         save((code, closure, base_globals))
>     292         write(pickle.REDUCE)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
>     726             for element in obj:
> --> 727                 save(element)
>     728             # Subtle.  Same as in the big comment below.
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_list(self, obj)
>     771         self.memoize(obj)
> --> 772         self._batch_appends(obj)
>     773 
> /usr/lib/python3.4/pickle.py in _batch_appends(self, items)
>     795                 for x in tmp:
> --> 796                     save(x)
>     797                 write(APPENDS)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
>     253             if klass is None or klass is not obj:
> --> 254                 self.save_function_tuple(obj)
>     255                 return
> /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
>     290         save(_make_skel_func)
> --> 291         save((code, closure, base_globals))
>     292         write(pickle.REDUCE)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
>     726             for element in obj:
> --> 727                 save(element)
>     728             # Subtle.  Same as in the big comment below.
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_list(self, obj)
>     771         self.memoize(obj)
> --> 772         self._batch_appends(obj)
>     773 
> /usr/lib/python3.4/pickle.py in _batch_appends(self, items)
>     798             elif n:
> --> 799                 save(tmp[0])
>     800                 write(APPEND)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
>     247             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
> --> 248             self.save_function_tuple(obj)
>     249             return
> /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
>     295         # save the rest of the func data needed by _fill_function
> --> 296         save(f_globals)
>     297         save(defaults)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_dict(self, obj)
>     811         self.memoize(obj)
> --> 812         self._batch_setitems(obj.items())
>     813 
> /usr/lib/python3.4/pickle.py in _batch_setitems(self, items)
>     842                 save(k)
> --> 843                 save(v)
>     844                 write(SETITEM)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     521         # Save the reduce() output and finally memoize the object
> --> 522         self.save_reduce(obj=obj, *rv)
>     523 
> /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
>     565             args = args[1:]
> --> 566             save(cls)
>     567 
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     490             if issc:
> --> 491                 self.save_global(obj)
>     492                 return
> /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack)
>     415             self.save(_load_class)
> --> 416             self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj)
>     417             d.pop('__doc__', None)
> /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
>     580             save(func)
> --> 581             save(args)
>     582             write(pickle.REDUCE)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
>     726             for element in obj:
> --> 727                 save(element)
>     728             # Subtle.  Same as in the big comment below.
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
>     726             for element in obj:
> --> 727                 save(element)
>     728             # Subtle.  Same as in the big comment below.
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     490             if issc:
> --> 491                 self.save_global(obj)
>     492                 return
> /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack)
>     430                 dd[k] = v
> --> 431             self.save(dd)
>     432             self.write(pickle.TUPLE2)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_dict(self, obj)
>     811         self.memoize(obj)
> --> 812         self._batch_setitems(obj.items())
>     813 
> /usr/lib/python3.4/pickle.py in _batch_setitems(self, items)
>     837                     save(k)
> --> 838                     save(v)
>     839                 write(SETITEMS)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     521         # Save the reduce() output and finally memoize the object
> --> 522         self.save_reduce(obj=obj, *rv)
>     523 
> /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
>     598         if state is not None:
> --> 599             save(state)
>     600             write(pickle.BUILD)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_dict(self, obj)
>     811         self.memoize(obj)
> --> 812         self._batch_setitems(obj.items())
>     813 
> /usr/lib/python3.4/pickle.py in _batch_setitems(self, items)
>     837                     save(k)
> --> 838                     save(v)
>     839                 write(SETITEMS)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
>     253             if klass is None or klass is not obj:
> --> 254                 self.save_function_tuple(obj)
>     255                 return
> /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
>     296         save(f_globals)
> --> 297         save(defaults)
>     298         save(dct)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
>     726             for element in obj:
> --> 727                 save(element)
>     728             # Subtle.  Same as in the big comment below.
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     521         # Save the reduce() output and finally memoize the object
> --> 522         self.save_reduce(obj=obj, *rv)
>     523 
> /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
>     565             args = args[1:]
> --> 566             save(cls)
>     567 
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack)
>     430                 dd[k] = v
> --> 431             self.save(dd)
>     432             self.write(pickle.TUPLE2)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /usr/lib/python3.4/pickle.py in save_dict(self, obj)
>     811         self.memoize(obj)
> --> 812         self._batch_setitems(obj.items())
>     813 
> /usr/lib/python3.4/pickle.py in _batch_setitems(self, items)
>     837                     save(k)
> --> 838                     save(v)
>     839                 write(SETITEMS)
> /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
>     476         if f is not None:
> --> 477             f(self, obj) # Call unbound method with explicit self
>     478             return
> /opt/spark/python/pyspark/cloudpickle.py in save_builtin_function(self, obj)
>     366             return self.save_global(obj)
> --> 367         return self.save_function(obj)
>     368     dispatch[types.BuiltinFunctionType] = save_builtin_function
> /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
>     245         # reference (as is done in default pickler), via save_function_tuple.
> --> 246         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
>     247             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
> AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
> During handling of the above exception, another exception occurred:
> AttributeError                            Traceback (most recent call last)
> <ipython-input-8-9ea6e84ab4cc> in <module>()
> ----> 1 sc.range(10).map(lambda x: b.x()).collect()
> /opt/spark/python/pyspark/rdd.py in collect(self)
>     806         """
>     807         with SCCallSiteSync(self.context) as css:
> --> 808             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>     809         return list(_load_from_socket(port, self._jrdd_deserializer))
>     810 
> /opt/spark/python/pyspark/rdd.py in _jrdd(self)
>    2438 
>    2439         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
> -> 2440                                       self._jrdd_deserializer, profiler)
>    2441         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
>    2442                                              self.preservesPartitioning)
> /opt/spark/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
>    2371     assert serializer, "serializer should not be empty"
>    2372     command = (func, profiler, deserializer, serializer)
> -> 2373     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
>    2374     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
>    2375                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)
> /opt/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
>    2357     # the serialized command will be compressed by broadcast
>    2358     ser = CloudPickleSerializer()
> -> 2359     pickled_command = ser.dumps(command)
>    2360     if len(pickled_command) > (1 << 20):  # 1M
>    2361         # The broadcast will have same life cycle as created PythonRDD
> /opt/spark/python/pyspark/serializers.py in dumps(self, obj)
>     458 
>     459     def dumps(self, obj):
> --> 460         return cloudpickle.dumps(obj, 2)
>     461 
>     462 
> /opt/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
>     701 
>     702     cp = CloudPickler(file,protocol)
> --> 703     cp.dump(obj)
>     704 
>     705     return file.getvalue()
> /opt/spark/python/pyspark/cloudpickle.py in dump(self, obj)
>     153             raise
>     154         except Exception as e:
> --> 155             if "'i' format requires" in e.message:
>     156                 msg = "Object too large to serialize: " + e.message
>     157             else:
> AttributeError: 'AttributeError' object has no attribute 'message'
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org