You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2016/09/09 16:59:21 UTC

[jira] [Assigned] (SPARK-17472) Better error message for serialization failures of large objects in Python

     [ https://issues.apache.org/jira/browse/SPARK-17472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-17472:
------------------------------------

    Assignee: Apache Spark

> Better error message for serialization failures of large objects in Python
> --------------------------------------------------------------------------
>
>                 Key: SPARK-17472
>                 URL: https://issues.apache.org/jira/browse/SPARK-17472
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>            Reporter: Eric Liang
>            Assignee: Apache Spark
>            Priority: Minor
>
> {code}
> def run():
>   import numpy.random as nr
>   b = nr.bytes(8 * 1000000000)
>   sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
> run()
> {code}
> Gives you the following error from pickle
> {code}
> error: 'i' format requires -2147483648 <= number <= 2147483647
> ---------------------------------------------------------------------------
> error                                     Traceback (most recent call last)
> <ipython-input-14-ba73d84faba7> in <module>()
>       4   sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
>       5 
> ----> 6 run()
> <ipython-input-14-ba73d84faba7> in run()
>       2   import numpy.random as nr
>       3   b = nr.bytes(8 * 1000000000)
> ----> 4   sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
>       5 
>       6 run()
> /databricks/spark/python/pyspark/rdd.pyc in count(self)
>    1002         3
>    1003         """
> -> 1004         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>    1005 
>    1006     def stats(self):
> /databricks/spark/python/pyspark/rdd.pyc in sum(self)
>     993         6.0
>     994         """
> --> 995         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>     996 
>     997     def count(self):
> /databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
>     867         # zeroValue provided to each partition is unique from the one provided
>     868         # to the final reduce call
> --> 869         vals = self.mapPartitions(func).collect()
>     870         return reduce(op, vals, zeroValue)
>     871 
> /databricks/spark/python/pyspark/rdd.pyc in collect(self)
>     769         """
>     770         with SCCallSiteSync(self.context) as css:
> --> 771             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>     772         return list(_load_from_socket(port, self._jrdd_deserializer))
>     773 
> /databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
>    2377         command = (self.func, profiler, self._prev_jrdd_deserializer,
>    2378                    self._jrdd_deserializer)
> -> 2379         pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
>    2380         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
>    2381                                              bytearray(pickled_cmd),
> /databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
>    2297     # the serialized command will be compressed by broadcast
>    2298     ser = CloudPickleSerializer()
> -> 2299     pickled_command = ser.dumps(command)
>    2300     if len(pickled_command) > (1 << 20):  # 1M
>    2301         # The broadcast will have same life cycle as created PythonRDD
> /databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
>     426 
>     427     def dumps(self, obj):
> --> 428         return cloudpickle.dumps(obj, 2)
>     429 
>     430 
> /databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
>     655 
>     656     cp = CloudPickler(file,protocol)
> --> 657     cp.dump(obj)
>     658 
>     659     return file.getvalue()
> /databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
>     105         self.inject_addons()
>     106         try:
> --> 107             return Pickler.dump(self, obj)
>     108         except RuntimeError as e:
>     109             if 'recursion' in e.args[0]:
> /usr/lib/python2.7/pickle.pyc in dump(self, obj)
>     222         if self.proto >= 2:
>     223             self.write(PROTO + chr(self.proto))
> --> 224         self.save(obj)
>     225         self.write(STOP)
>     226 
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
>     560         write(MARK)
>     561         for element in obj:
> --> 562             save(element)
>     563 
>     564         if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
>     202             klass = getattr(themodule, name, None)
>     203             if klass is None or klass is not obj:
> --> 204                 self.save_function_tuple(obj)
>     205                 return
>     206 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
>     239         # create a skeleton function object and memoize it
>     240         save(_make_skel_func)
> --> 241         save((code, closure, base_globals))
>     242         write(pickle.REDUCE)
>     243         self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
>     546         if n <= 3 and proto >= 2:
>     547             for element in obj:
> --> 548                 save(element)
>     549             # Subtle.  Same as in the big comment below.
>     550             if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
>     598 
>     599         self.memoize(obj)
> --> 600         self._batch_appends(iter(obj))
>     601 
>     602     dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
>     631                 write(MARK)
>     632                 for x in tmp:
> --> 633                     save(x)
>     634                 write(APPENDS)
>     635             elif n:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
>     202             klass = getattr(themodule, name, None)
>     203             if klass is None or klass is not obj:
> --> 204                 self.save_function_tuple(obj)
>     205                 return
>     206 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
>     239         # create a skeleton function object and memoize it
>     240         save(_make_skel_func)
> --> 241         save((code, closure, base_globals))
>     242         write(pickle.REDUCE)
>     243         self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
>     546         if n <= 3 and proto >= 2:
>     547             for element in obj:
> --> 548                 save(element)
>     549             # Subtle.  Same as in the big comment below.
>     550             if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
>     598 
>     599         self.memoize(obj)
> --> 600         self._batch_appends(iter(obj))
>     601 
>     602     dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
>     631                 write(MARK)
>     632                 for x in tmp:
> --> 633                     save(x)
>     634                 write(APPENDS)
>     635             elif n:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
>     202             klass = getattr(themodule, name, None)
>     203             if klass is None or klass is not obj:
> --> 204                 self.save_function_tuple(obj)
>     205                 return
>     206 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
>     239         # create a skeleton function object and memoize it
>     240         save(_make_skel_func)
> --> 241         save((code, closure, base_globals))
>     242         write(pickle.REDUCE)
>     243         self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
>     546         if n <= 3 and proto >= 2:
>     547             for element in obj:
> --> 548                 save(element)
>     549             # Subtle.  Same as in the big comment below.
>     550             if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
>     598 
>     599         self.memoize(obj)
> --> 600         self._batch_appends(iter(obj))
>     601 
>     602     dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
>     631                 write(MARK)
>     632                 for x in tmp:
> --> 633                     save(x)
>     634                 write(APPENDS)
>     635             elif n:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
>     202             klass = getattr(themodule, name, None)
>     203             if klass is None or klass is not obj:
> --> 204                 self.save_function_tuple(obj)
>     205                 return
>     206 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
>     239         # create a skeleton function object and memoize it
>     240         save(_make_skel_func)
> --> 241         save((code, closure, base_globals))
>     242         write(pickle.REDUCE)
>     243         self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
>     546         if n <= 3 and proto >= 2:
>     547             for element in obj:
> --> 548                 save(element)
>     549             # Subtle.  Same as in the big comment below.
>     550             if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
>     598 
>     599         self.memoize(obj)
> --> 600         self._batch_appends(iter(obj))
>     601 
>     602     dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
>     634                 write(APPENDS)
>     635             elif n:
> --> 636                 save(tmp[0])
>     637                 write(APPEND)
>     638             # else tmp is empty, and we're done
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
>     196         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
>     197             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
> --> 198             self.save_function_tuple(obj)
>     199             return
>     200         else:
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
>     239         # create a skeleton function object and memoize it
>     240         save(_make_skel_func)
> --> 241         save((code, closure, base_globals))
>     242         write(pickle.REDUCE)
>     243         self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
>     546         if n <= 3 and proto >= 2:
>     547             for element in obj:
> --> 548                 save(element)
>     549             # Subtle.  Same as in the big comment below.
>     550             if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
>     598 
>     599         self.memoize(obj)
> --> 600         self._batch_appends(iter(obj))
>     601 
>     602     dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
>     634                 write(APPENDS)
>     635             elif n:
> --> 636                 save(tmp[0])
>     637                 write(APPEND)
>     638             # else tmp is empty, and we're done
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
>     284         f = self.dispatch.get(t)
>     285         if f:
> --> 286             f(self, obj) # Call unbound method with explicit self
>     287             return
>     288 
> /usr/lib/python2.7/pickle.pyc in save_string(self, obj, pack)
>     484                 self.write(SHORT_BINSTRING + chr(n) + obj)
>     485             else:
> --> 486                 self.write(BINSTRING + pack("<i", n) + obj)
>     487         else:
>     488             self.write(STRING + repr(obj) + '\n')
> error: 'i' format requires -2147483648 <= number <= 2147483647
> {code}
> =======================================================
> {code}
> def run():
>   import numpy.random as nr
>   b = sc.broadcast(nr.bytes(8 * 1000000000))
>   sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
> run()
> {code}
> Gives you
> {code}
> ---------------------------------------------------------------------------
> SystemError                               Traceback (most recent call last)
> <ipython-input-14-53cbdb8ed528> in <module>()
>       4   sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
>       5 
> ----> 6 run()
> <ipython-input-14-53cbdb8ed528> in run()
>       1 def run():
>       2   import numpy.random as nr
> ----> 3   b = sc.broadcast(nr.bytes(8 * 1000000000))
>       4   sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
>       5 
> /databricks/spark/python/pyspark/context.py in broadcast(self, value)
>     741         be sent to each cluster only once.
>     742         """
> --> 743         return Broadcast(self, value, self._pickled_broadcast_vars)
>     744 
>     745     def accumulator(self, value, accum_param=None):
> /databricks/spark/python/pyspark/broadcast.py in __init__(self, sc, value, pickle_registry, path)
>      68         if sc is not None:
>      69             f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)
> ---> 70             self._path = self.dump(value, f)
>      71             self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)
>      72             self._pickle_registry = pickle_registry
> /databricks/spark/python/pyspark/broadcast.py in dump(self, value, f)
>      76 
>      77     def dump(self, value, f):
> ---> 78         pickle.dump(value, f, 2)
>      79         f.close()
>      80         return f.name
> SystemError: error return without exception set
> {code}
> In both cases, we should have a better error saying that the task or broadcast could not be serialized or was too big.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org