You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/02/20 08:38:44 UTC
[jira] [Resolved] (SPARK-15679) Passing functions do not work in
classes
[ https://issues.apache.org/jira/browse/SPARK-15679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-15679.
----------------------------------
Resolution: Invalid
Your test codes look not working as is. I manually modified your codes and could reproduce it as below:
{code}
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
class A(object):
def myFunc(self, s):
return s
def run(self):
data = [("a",), ("b",), ("c",)]
df = sqlContext.createDataFrame(data)
return df.rdd
if __name__ == "__main__":
print A().run().count()
print A().run().map(lambda l: A().myFunc).count()
{code}
In this case, it seems {{sqlContext}} is being tried to be serialized in {{A()}} by cloudpickle. {{sqlContext}} seems not serializable as below:
{code}
>>> import cPickle
>>>
>>> cPickle.dumps(sqlContext)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/copy_reg.py", line 84, in _reduce_ex
dict = getstate()
TypeError: 'JavaPackage' object is not callable
{code}
It works once you take out the {{run}} func or {{myFunc}} out. Generally, bound methods are not picklable up to my knowledge (at least by built-in pickle one) and I guess it is not recommended.
I don't think making {{sqlContext}} picklable sounds making sense.
Please reopen this if I was wrong and mistaken.
> Passing functions do not work in classes
> ----------------------------------------
>
> Key: SPARK-15679
> URL: https://issues.apache.org/jira/browse/SPARK-15679
> Project: Spark
> Issue Type: Bug
> Reporter: nalin garg
>
> {code}
> from pyspark import SparkContext
> from pyspark.sql import SQLContext, HiveContext
> import csv, io, StringIO
> from pyspark.sql.functions import *
> from pyspark.sql import Row
> from pyspark.sql import *
> from pyspark.sql import functions as F
> from pyspark.sql.functions import asc, desc
> sc = SparkContext("local", "Summary Report")
> sqlContext = SQLContext(sc)
> class ParallelBuild(object):
> def myFunc(self, s):
> l = s.split(',')
> print l[0], l[1]
> return l[0]
> def list_to_csv_str(x):
> output = StringIO.StringIO("")
> csv.writer(output).writerow(x)
> return output.getvalue().strip()
> def run(self):
> data = [(1,'a'), (1,'b'), (1,'c'), (2,'d'), (3,'r'), (4,'a'), (2,'t'), (3,'y'), (1,'f')]
> df = sqlContext.createDataFrame(data, schema= ['uid', 'address_uid'])
> return df
> if __name__ == "__main__":
> df_rdd = ParallelBuild().run().map(lambda line: line).persist()
> r = df_rdd.map(ParallelBuild().myFunc)
> r.count()
> {code}
> Above code returns JavaPackage not callable exception.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org