You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2016/09/09 16:59:21 UTC
[jira] [Assigned] (SPARK-17472) Better error message for
serialization failures of large objects in Python
[ https://issues.apache.org/jira/browse/SPARK-17472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-17472:
------------------------------------
Assignee: Apache Spark
> Better error message for serialization failures of large objects in Python
> --------------------------------------------------------------------------
>
> Key: SPARK-17472
> URL: https://issues.apache.org/jira/browse/SPARK-17472
> Project: Spark
> Issue Type: Improvement
> Components: PySpark
> Reporter: Eric Liang
> Assignee: Apache Spark
> Priority: Minor
>
> {code}
> def run():
> import numpy.random as nr
> b = nr.bytes(8 * 1000000000)
> sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
> run()
> {code}
> Gives you the following error from pickle
> {code}
> error: 'i' format requires -2147483648 <= number <= 2147483647
> ---------------------------------------------------------------------------
> error Traceback (most recent call last)
> <ipython-input-14-ba73d84faba7> in <module>()
> 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
> 5
> ----> 6 run()
> <ipython-input-14-ba73d84faba7> in run()
> 2 import numpy.random as nr
> 3 b = nr.bytes(8 * 1000000000)
> ----> 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()
> 5
> 6 run()
> /databricks/spark/python/pyspark/rdd.pyc in count(self)
> 1002 3
> 1003 """
> -> 1004 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> 1005
> 1006 def stats(self):
> /databricks/spark/python/pyspark/rdd.pyc in sum(self)
> 993 6.0
> 994 """
> --> 995 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
> 996
> 997 def count(self):
> /databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
> 867 # zeroValue provided to each partition is unique from the one provided
> 868 # to the final reduce call
> --> 869 vals = self.mapPartitions(func).collect()
> 870 return reduce(op, vals, zeroValue)
> 871
> /databricks/spark/python/pyspark/rdd.pyc in collect(self)
> 769 """
> 770 with SCCallSiteSync(self.context) as css:
> --> 771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
> 772 return list(_load_from_socket(port, self._jrdd_deserializer))
> 773
> /databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
> 2377 command = (self.func, profiler, self._prev_jrdd_deserializer,
> 2378 self._jrdd_deserializer)
> -> 2379 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
> 2380 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
> 2381 bytearray(pickled_cmd),
> /databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
> 2297 # the serialized command will be compressed by broadcast
> 2298 ser = CloudPickleSerializer()
> -> 2299 pickled_command = ser.dumps(command)
> 2300 if len(pickled_command) > (1 << 20): # 1M
> 2301 # The broadcast will have same life cycle as created PythonRDD
> /databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
> 426
> 427 def dumps(self, obj):
> --> 428 return cloudpickle.dumps(obj, 2)
> 429
> 430
> /databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
> 655
> 656 cp = CloudPickler(file,protocol)
> --> 657 cp.dump(obj)
> 658
> 659 return file.getvalue()
> /databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
> 105 self.inject_addons()
> 106 try:
> --> 107 return Pickler.dump(self, obj)
> 108 except RuntimeError as e:
> 109 if 'recursion' in e.args[0]:
> /usr/lib/python2.7/pickle.pyc in dump(self, obj)
> 222 if self.proto >= 2:
> 223 self.write(PROTO + chr(self.proto))
> --> 224 self.save(obj)
> 225 self.write(STOP)
> 226
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
> 560 write(MARK)
> 561 for element in obj:
> --> 562 save(element)
> 563
> 564 if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
> 202 klass = getattr(themodule, name, None)
> 203 if klass is None or klass is not obj:
> --> 204 self.save_function_tuple(obj)
> 205 return
> 206
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
> 239 # create a skeleton function object and memoize it
> 240 save(_make_skel_func)
> --> 241 save((code, closure, base_globals))
> 242 write(pickle.REDUCE)
> 243 self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
> 546 if n <= 3 and proto >= 2:
> 547 for element in obj:
> --> 548 save(element)
> 549 # Subtle. Same as in the big comment below.
> 550 if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
> 598
> 599 self.memoize(obj)
> --> 600 self._batch_appends(iter(obj))
> 601
> 602 dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
> 631 write(MARK)
> 632 for x in tmp:
> --> 633 save(x)
> 634 write(APPENDS)
> 635 elif n:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
> 202 klass = getattr(themodule, name, None)
> 203 if klass is None or klass is not obj:
> --> 204 self.save_function_tuple(obj)
> 205 return
> 206
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
> 239 # create a skeleton function object and memoize it
> 240 save(_make_skel_func)
> --> 241 save((code, closure, base_globals))
> 242 write(pickle.REDUCE)
> 243 self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
> 546 if n <= 3 and proto >= 2:
> 547 for element in obj:
> --> 548 save(element)
> 549 # Subtle. Same as in the big comment below.
> 550 if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
> 598
> 599 self.memoize(obj)
> --> 600 self._batch_appends(iter(obj))
> 601
> 602 dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
> 631 write(MARK)
> 632 for x in tmp:
> --> 633 save(x)
> 634 write(APPENDS)
> 635 elif n:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
> 202 klass = getattr(themodule, name, None)
> 203 if klass is None or klass is not obj:
> --> 204 self.save_function_tuple(obj)
> 205 return
> 206
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
> 239 # create a skeleton function object and memoize it
> 240 save(_make_skel_func)
> --> 241 save((code, closure, base_globals))
> 242 write(pickle.REDUCE)
> 243 self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
> 546 if n <= 3 and proto >= 2:
> 547 for element in obj:
> --> 548 save(element)
> 549 # Subtle. Same as in the big comment below.
> 550 if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
> 598
> 599 self.memoize(obj)
> --> 600 self._batch_appends(iter(obj))
> 601
> 602 dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
> 631 write(MARK)
> 632 for x in tmp:
> --> 633 save(x)
> 634 write(APPENDS)
> 635 elif n:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
> 202 klass = getattr(themodule, name, None)
> 203 if klass is None or klass is not obj:
> --> 204 self.save_function_tuple(obj)
> 205 return
> 206
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
> 239 # create a skeleton function object and memoize it
> 240 save(_make_skel_func)
> --> 241 save((code, closure, base_globals))
> 242 write(pickle.REDUCE)
> 243 self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
> 546 if n <= 3 and proto >= 2:
> 547 for element in obj:
> --> 548 save(element)
> 549 # Subtle. Same as in the big comment below.
> 550 if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
> 598
> 599 self.memoize(obj)
> --> 600 self._batch_appends(iter(obj))
> 601
> 602 dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
> 634 write(APPENDS)
> 635 elif n:
> --> 636 save(tmp[0])
> 637 write(APPEND)
> 638 # else tmp is empty, and we're done
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
> 196 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
> 197 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
> --> 198 self.save_function_tuple(obj)
> 199 return
> 200 else:
> /databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
> 239 # create a skeleton function object and memoize it
> 240 save(_make_skel_func)
> --> 241 save((code, closure, base_globals))
> 242 write(pickle.REDUCE)
> 243 self.memoize(func)
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
> 546 if n <= 3 and proto >= 2:
> 547 for element in obj:
> --> 548 save(element)
> 549 # Subtle. Same as in the big comment below.
> 550 if id(obj) in memo:
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_list(self, obj)
> 598
> 599 self.memoize(obj)
> --> 600 self._batch_appends(iter(obj))
> 601
> 602 dispatch[ListType] = save_list
> /usr/lib/python2.7/pickle.pyc in _batch_appends(self, items)
> 634 write(APPENDS)
> 635 elif n:
> --> 636 save(tmp[0])
> 637 write(APPEND)
> 638 # else tmp is empty, and we're done
> /usr/lib/python2.7/pickle.pyc in save(self, obj)
> 284 f = self.dispatch.get(t)
> 285 if f:
> --> 286 f(self, obj) # Call unbound method with explicit self
> 287 return
> 288
> /usr/lib/python2.7/pickle.pyc in save_string(self, obj, pack)
> 484 self.write(SHORT_BINSTRING + chr(n) + obj)
> 485 else:
> --> 486 self.write(BINSTRING + pack("<i", n) + obj)
> 487 else:
> 488 self.write(STRING + repr(obj) + '\n')
> error: 'i' format requires -2147483648 <= number <= 2147483647
> {code}
> =======================================================
> {code}
> def run():
> import numpy.random as nr
> b = sc.broadcast(nr.bytes(8 * 1000000000))
> sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
> run()
> {code}
> Gives you
> {code}
> ---------------------------------------------------------------------------
> SystemError Traceback (most recent call last)
> <ipython-input-14-53cbdb8ed528> in <module>()
> 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
> 5
> ----> 6 run()
> <ipython-input-14-53cbdb8ed528> in run()
> 1 def run():
> 2 import numpy.random as nr
> ----> 3 b = sc.broadcast(nr.bytes(8 * 1000000000))
> 4 sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()
> 5
> /databricks/spark/python/pyspark/context.py in broadcast(self, value)
> 741 be sent to each cluster only once.
> 742 """
> --> 743 return Broadcast(self, value, self._pickled_broadcast_vars)
> 744
> 745 def accumulator(self, value, accum_param=None):
> /databricks/spark/python/pyspark/broadcast.py in __init__(self, sc, value, pickle_registry, path)
> 68 if sc is not None:
> 69 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)
> ---> 70 self._path = self.dump(value, f)
> 71 self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)
> 72 self._pickle_registry = pickle_registry
> /databricks/spark/python/pyspark/broadcast.py in dump(self, value, f)
> 76
> 77 def dump(self, value, f):
> ---> 78 pickle.dump(value, f, 2)
> 79 f.close()
> 80 return f.name
> SystemError: error return without exception set
> {code}
> In both cases, we should have a better error saying that the task or broadcast could not be serialized or was too big.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org