You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by madeleine <ma...@gmail.com> on 2014/06/16 01:49:09 UTC

pyspark serializer can't handle functions?

It seems that the default serializer used by pyspark can't serialize a list
of functions.
I've seen some posts about trying to fix this by using dill to serialize
rather than pickle. 
Does anyone know what the status of that project is, or whether there's
another easy workaround?

I've pasted a sample error message below. Here, regs is a function defined
in another file myfile.py that has been included on all workers via the
pyFiles argument to SparkContext: sc = SparkContext("local",
"myapp",pyFiles=["myfile.py"]).

  File "runfile.py", line 45, in __init__
    regsRDD = sc.parallelize([regs]*self.n)
  File "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/context.py",
line 223, in parallelize
    serializer.dump_stream(c, tempFile)
  File
"/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
182, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File
"/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
118, in dump_stream
    self._write_with_length(obj, stream)
  File
"/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
128, in _write_with_length
    serialized = self.dumps(obj)
  File
"/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
270, in dumps
    def dumps(self, obj): return cPickle.dumps(obj, 2)
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup
__builtin__.function failed



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: pyspark serializer can't handle functions?

Posted by Matei Zaharia <ma...@gmail.com>.
Ah, I see, interesting. CloudPickle is slower than the cPickle library, so that’s why we didn’t use it for data, but it should be possible to write a Serializer that uses it. Another thing you can do for this use case though is to define a class that represents your functions:

class MyFunc(object):
    def __call__(self, argument):
        return argument

f = MyFunc()

f(5)

Instances of a class like this should be pickle-able using the standard pickle serializer, though you may have to put the class in a separate .py file and include that in the list of .py files passed to SparkContext. And then in the code you can still use them as functions.

Matei

On Jun 16, 2014, at 1:12 PM, madeleine <ma...@gmail.com> wrote:

> Interesting! I'm curious why you use cloudpickle internally, but then use standard pickle to serialize RDDs?
> 
> I'd like to create an RDD of functions because (I think) it's the most natural way to express my problem. I have a matrix of functions; I'm trying to find a low rank matrix that minimizes the sum of these functions evaluated on the entries on the low rank matrix. For example, the problem is PCA on the matrix A when the (i,j)th function is lambda z: (z-A[i,j])^2. In general, each of these functions is defined using a two argument base function lambda a,z: (z-a)^2 and the data A[i,j]; but it's somewhat cleaner just to express the minimization problem in terms of the one argument functions. 
> 
> One other wrinkle is that I'm using alternating minimization, so I'll be minimizing over the rows and columns of this matrix at alternating steps; hence I need to store both the matrix and its transpose to avoid data thrashing.
> 
> 
> On Mon, Jun 16, 2014 at 11:05 AM, Matei Zaharia [via Apache Spark User List] <[hidden email]> wrote:
> It’s true that it can’t. You can try to use the CloudPickle library instead, which is what we use within PySpark to serialize functions (see python/pyspark/cloudpickle.py). However I’m also curious, why do you need an RDD of functions? 
> 
> Matei 
> 
> On Jun 15, 2014, at 4:49 PM, madeleine <[hidden email]> wrote: 
> 
> > It seems that the default serializer used by pyspark can't serialize a list 
> > of functions. 
> > I've seen some posts about trying to fix this by using dill to serialize 
> > rather than pickle. 
> > Does anyone know what the status of that project is, or whether there's 
> > another easy workaround? 
> > 
> > I've pasted a sample error message below. Here, regs is a function defined 
> > in another file myfile.py that has been included on all workers via the 
> > pyFiles argument to SparkContext: sc = SparkContext("local", 
> > "myapp",pyFiles=["myfile.py"]). 
> > 
> >  File "runfile.py", line 45, in __init__ 
> >    regsRDD = sc.parallelize([regs]*self.n) 
> >  File "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/context.py", 
> > line 223, in parallelize 
> >    serializer.dump_stream(c, tempFile) 
> >  File 
> > "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line 
> > 182, in dump_stream 
> >    self.serializer.dump_stream(self._batched(iterator), stream) 
> >  File 
> > "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line 
> > 118, in dump_stream 
> >    self._write_with_length(obj, stream) 
> >  File 
> > "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line 
> > 128, in _write_with_length 
> >    serialized = self.dumps(obj) 
> >  File 
> > "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line 
> > 270, in dumps 
> >    def dumps(self, obj): return cPickle.dumps(obj, 2) 
> > cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup 
> > __builtin__.function failed 
> > 
> > 
> > 
> > -- 
> > View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 
> 
> If you reply to this email, your message will be added to the discussion below:
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650p7682.html
> To unsubscribe from pyspark serializer can't handle functions?, click here.
> NAML
> 
> 
> 
> -- 
> Madeleine Udell
> PhD Candidate in Computational and Mathematical Engineering
> Stanford University
> www.stanford.edu/~udell
> 
> View this message in context: Re: pyspark serializer can't handle functions?
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pyspark serializer can't handle functions?

Posted by madeleine <ma...@gmail.com>.
Interesting! I'm curious why you use cloudpickle internally, but then use
standard pickle to serialize RDDs?

I'd like to create an RDD of functions because (I think) it's the most
natural way to express my problem. I have a matrix of functions; I'm trying
to find a low rank matrix that minimizes the sum of these functions
evaluated on the entries on the low rank matrix. For example, the problem
is PCA on the matrix A when the (i,j)th function is lambda z: (z-A[i,j])^2.
In general, each of these functions is defined using a two argument base
function lambda a,z: (z-a)^2 and the data A[i,j]; but it's somewhat cleaner
just to express the minimization problem in terms of the one argument
functions.

One other wrinkle is that I'm using alternating minimization, so I'll be
minimizing over the rows and columns of this matrix at alternating steps;
hence I need to store both the matrix and its transpose to avoid data
thrashing.


On Mon, Jun 16, 2014 at 11:05 AM, Matei Zaharia [via Apache Spark User
List] <ml...@n3.nabble.com> wrote:

> It’s true that it can’t. You can try to use the CloudPickle library
> instead, which is what we use within PySpark to serialize functions (see
> python/pyspark/cloudpickle.py). However I’m also curious, why do you need
> an RDD of functions?
>
> Matei
>
> On Jun 15, 2014, at 4:49 PM, madeleine <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=7682&i=0>> wrote:
>
> > It seems that the default serializer used by pyspark can't serialize a
> list
> > of functions.
> > I've seen some posts about trying to fix this by using dill to serialize
> > rather than pickle.
> > Does anyone know what the status of that project is, or whether there's
> > another easy workaround?
> >
> > I've pasted a sample error message below. Here, regs is a function
> defined
> > in another file myfile.py that has been included on all workers via the
> > pyFiles argument to SparkContext: sc = SparkContext("local",
> > "myapp",pyFiles=["myfile.py"]).
> >
> >  File "runfile.py", line 45, in __init__
> >    regsRDD = sc.parallelize([regs]*self.n)
> >  File "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/context.py",
> > line 223, in parallelize
> >    serializer.dump_stream(c, tempFile)
> >  File
> > "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py",
> line
> > 182, in dump_stream
> >    self.serializer.dump_stream(self._batched(iterator), stream)
> >  File
> > "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py",
> line
> > 118, in dump_stream
> >    self._write_with_length(obj, stream)
> >  File
> > "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py",
> line
> > 128, in _write_with_length
> >    serialized = self.dumps(obj)
> >  File
> > "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py",
> line
> > 270, in dumps
> >    def dumps(self, obj): return cPickle.dumps(obj, 2)
> > cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup
> > __builtin__.function failed
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650p7682.html
>  To unsubscribe from pyspark serializer can't handle functions?, click
> here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7650&code=bWFkZWxlaW5lLnVkZWxsQGdtYWlsLmNvbXw3NjUwfC0yMDUyNTU5NTk5>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>



-- 
Madeleine Udell
PhD Candidate in Computational and Mathematical Engineering
Stanford University
www.stanford.edu/~udell




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650p7694.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: pyspark serializer can't handle functions?

Posted by Matei Zaharia <ma...@gmail.com>.
It’s true that it can’t. You can try to use the CloudPickle library instead, which is what we use within PySpark to serialize functions (see python/pyspark/cloudpickle.py). However I’m also curious, why do you need an RDD of functions?

Matei

On Jun 15, 2014, at 4:49 PM, madeleine <ma...@gmail.com> wrote:

> It seems that the default serializer used by pyspark can't serialize a list
> of functions.
> I've seen some posts about trying to fix this by using dill to serialize
> rather than pickle. 
> Does anyone know what the status of that project is, or whether there's
> another easy workaround?
> 
> I've pasted a sample error message below. Here, regs is a function defined
> in another file myfile.py that has been included on all workers via the
> pyFiles argument to SparkContext: sc = SparkContext("local",
> "myapp",pyFiles=["myfile.py"]).
> 
>  File "runfile.py", line 45, in __init__
>    regsRDD = sc.parallelize([regs]*self.n)
>  File "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/context.py",
> line 223, in parallelize
>    serializer.dump_stream(c, tempFile)
>  File
> "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
> 182, in dump_stream
>    self.serializer.dump_stream(self._batched(iterator), stream)
>  File
> "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
> 118, in dump_stream
>    self._write_with_length(obj, stream)
>  File
> "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
> 128, in _write_with_length
>    serialized = self.dumps(obj)
>  File
> "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
> 270, in dumps
>    def dumps(self, obj): return cPickle.dumps(obj, 2)
> cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup
> __builtin__.function failed
> 
> 
> 
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.