You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Parth Gandhi (JIRA)" <ji...@apache.org> on 2019/01/17 22:09:00 UTC
[jira] [Created] (SPARK-26658) Pyspark job is unable to serialize
large objects
Parth Gandhi created SPARK-26658:
------------------------------------
Summary: Pyspark job is unable to serialize large objects
Key: SPARK-26658
URL: https://issues.apache.org/jira/browse/SPARK-26658
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 3.0.0
Reporter: Parth Gandhi
When a pyspark job using python 3 tries to serialize large objects, it throws a pickle error in case of trying to serialize global variable object and overflow error in case of broadcast.
global object:
{code:java}
Traceback (most recent call last):
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump
self.save(obj)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple
save(element)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
self._batch_appends(obj)
File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
save(x)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
self._batch_appends(obj)
File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
save(x)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
self._batch_appends(obj)
File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
save(x)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
self._batch_appends(obj)
File "/home/var/python36/lib/python3.6/pickle.py", line 808, in _batch_appends
save(tmp[0])
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce
save(state)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce
save(state)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple
save(element)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes
(str(obj, 'latin1'), 'latin1'), obj=obj)
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 786, in save_reduce
save(args)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str
self.write(BINUNICODE + pack("<I", n) + encoded)
struct.error: 'I' format requires 0 <= number <= 4294967295
{code}
broadcast:
{code:java}
Traceback (most recent call last):
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump
pickle.dump(value, f, 2)
OverflowError: cannot serialize a string larger than 4GiB
Traceback (most recent call last):
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump
pickle.dump(value, f, 2)
OverflowError: cannot serialize a string larger than 4GiB
{code}
Steps to Reproduce:
- To reproduce the above issue, I am using the word2vec model trained on the Google News dataset downloaded from [https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing]
- Use python 3.x with module gensim installed(or ship the module zip file using --py-files).
Launch pyspark with the following command:
{code:java}
bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf spark.driver.memory=16g --conf spark.executor.memory=16g --conf spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhd=16g --conf spark.executor.pyspark.memory=16g{code}
For the sake of reproducing the issue, I have simply pasted certain parts of the code here:
{code:java}
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT
/_/
Using Python version 3.6.3 (default, Jun 19 2018 22:39:11)
SparkSession available as 'spark'.
>>> import gensim
/home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
RequestsDependencyWarning)
>>> score_threshold = 0.65
>>> synonym_limit = 3
>>> model = gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', binary=True)
>>> def isPhrase(word):
... if word.find('_') != -1 :
... return 1
... return 0
...
>>> def process_word(line):
... word = "test"
... positiveWords = []
... positiveWords.append(word)
... try :
... results = model.most_similar(positive=positiveWords)
... synonym_vec = []
... for i in range(len(results)) :
... result = results[i]
... if (result[1] > score_threshold ) :
... synonym = result[0]
... synonym = synonym.lower()
... if (isPhrase(synonym)==0) and (word != synonym) :
... synonym_vec.append(synonym)
... if len(synonym_vec) > synonym_limit :
... break
... if len(synonym_vec) > 0 :
... #print(word +"\t"+ ",".join(synonym_vec))
... return (word, ",".join(synonym_vec))
... except KeyError :
... sys.stderr.write("key error: " + word + "\n")
...
>>> if __name__ == "__main__":
... rdd = sc.parallelize(["test1", "test2", "test3"])
... rdd2 = rdd.map(process_word)
... rdd2.count()
...
{code}
* For reproducing the issue with broadcast, simply run the code below in pyspark shell:
{code:java}
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.3-SNAPSHOT
/_/
Using Python version 3.6.3 (default, Jun 19 2018 22:39:11)
SparkSession available as 'spark'.
>>> import gensim
/home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
RequestsDependencyWarning)
>>> model = sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://user/pgandhi/GoogleNews-vectors-negative300.bin', binary=True))
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org