You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Parth Gandhi (JIRA)" <ji...@apache.org> on 2019/01/17 22:09:00 UTC

[jira] [Created] (SPARK-26658) Pyspark job is unable to serialize large objects

Parth Gandhi created SPARK-26658:
------------------------------------

             Summary: Pyspark job is unable to serialize large objects 
                 Key: SPARK-26658
                 URL: https://issues.apache.org/jira/browse/SPARK-26658
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.0.0
            Reporter: Parth Gandhi


When a pyspark job using python 3 tries to serialize large objects, it throws a pickle error in case of trying to serialize global variable object and overflow error in case of broadcast.

global object:

 
{code:java}
Traceback (most recent call last):
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
    save(x)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
    save(x)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
    save(x)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/home/var/python36/lib/python3.6/pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce
    save(state)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce
    save(state)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes
    (str(obj, 'latin1'), 'latin1'), obj=obj)
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 786, in save_reduce
    save(args)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple
    save(element)
  File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str
    self.write(BINUNICODE + pack("<I", n) + encoded)
struct.error: 'I' format requires 0 <= number <= 4294967295
{code}
 

 

 

broadcast:
{code:java}
Traceback (most recent call last):
 
File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump
    pickle.dump(value, f, 2)
OverflowError: cannot serialize a string larger than 4GiB
Traceback (most recent call last):
  File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump
    pickle.dump(value, f, 2)
OverflowError: cannot serialize a string larger than 4GiB
{code}
 

 

Steps to Reproduce:

- To reproduce the above issue, I am using the word2vec model trained on the Google News dataset downloaded from [https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing]

 - Use python 3.x with module gensim installed(or ship the module zip file using --py-files).

Launch pyspark with the following command:
{code:java}
bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf spark.driver.memory=16g --conf spark.executor.memory=16g --conf spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhd=16g --conf spark.executor.pyspark.memory=16g{code}
For the sake of reproducing the issue, I have simply pasted certain parts of the code here:

 
{code:java}
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.3-SNAPSHOT
      /_/
 
Using Python version 3.6.3 (default, Jun 19 2018 22:39:11)
SparkSession available as 'spark'.
>>> import gensim
/home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
  RequestsDependencyWarning)
>>> score_threshold = 0.65
>>> synonym_limit = 3
>>> model = gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', binary=True)
>>> def isPhrase(word):
...     if word.find('_') != -1 :
...         return 1
...     return 0
... 
>>> def process_word(line):
...             word = "test"
...             positiveWords = []
...             positiveWords.append(word)
...             try :
...                results = model.most_similar(positive=positiveWords)
...                synonym_vec = []
...                for i in range(len(results)) :
...                   result = results[i]
...                   if (result[1] > score_threshold ) :
...                       synonym = result[0]
...                       synonym = synonym.lower()
...                       if (isPhrase(synonym)==0) and (word != synonym) :
...                           synonym_vec.append(synonym)
...                   if len(synonym_vec) > synonym_limit :
...                       break
...                if  len(synonym_vec) > 0 :
...                    #print(word +"\t"+ ",".join(synonym_vec))
...                    return (word, ",".join(synonym_vec))
...             except KeyError :
...                sys.stderr.write("key error: " + word + "\n")
... 
>>> if __name__ == "__main__":
...   rdd = sc.parallelize(["test1", "test2", "test3"])
...   rdd2 = rdd.map(process_word)
...   rdd2.count()
...
{code}
 
 * For reproducing the issue with broadcast, simply run the code below in pyspark shell:

 
{code:java}
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.3-SNAPSHOT
      /_/
 
Using Python version 3.6.3 (default, Jun 19 2018 22:39:11)
SparkSession available as 'spark'.
>>> import gensim
/home/y/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
  RequestsDependencyWarning)
>>> model = sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://user/pgandhi/GoogleNews-vectors-negative300.bin', binary=True))
{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org