You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sandy Ryza <sa...@cloudera.com> on 2014/11/10 10:01:00 UTC

closure serialization behavior driving me crazy

I'm experiencing some strange behavior with closure serialization that is
totally mind-boggling to me.  It appears that two arrays of equal size take
up vastly different amount of space inside closures if they're generated in
different ways.

The basic flow of my app is to run a bunch of tiny regressions using
Commons Math's OLSMultipleLinearRegression and then reference a 2D array of
the results from a transformation.  I was running into OOME's and
NotSerializableExceptions and tried to get closer to the root issue by
calling the closure serializer directly.
  scala> val arr = models.map(_.estimateRegressionParameters()).toArray

The result array is 1867 x 5. It serialized is 80k bytes, which seems about
right:
  scala> SparkEnv.get.closureSerializer.newInstance().serialize(arr)
  res17: java.nio.ByteBuffer = java.nio.HeapByteBuffer[pos=0 lim=80027
cap=80027]

If I reference it from a simple function:
  scala> def func(x: Long) => arr.length
  scala> SparkEnv.get.closureSerializer.newInstance().serialize(func)
I get a NotSerializableException.

If I take pains to create the array using a loop:
  scala> val arr = Array.ofDim[Double](1867, 5)
  scala> for (s <- 0 until models.length) {
  | factorWeights(s) = models(s).estimateRegressionParameters()
  | }
Serialization works, but the serialized closure for the function is a
whopping 400MB.

If I pass in an array of the same length that was created in a different
way, the size of the serialized closure is only about 90K, which seems
about right.

Naively, it seems like somehow the history of how the array was created is
having an effect on what happens to it inside a closure.

Is this expected behavior?  Can anybody explain what's going on?

any insight very appreciated,
Sandy

Re: closure serialization behavior driving me crazy

Posted by Sandy Ryza <sa...@cloudera.com>.
I tried turning on the extended debug info.  The Scala output is a little
opaque (lots of "- field (class "$iwC$$iwC$$iwC$$iwC$$iwC$$iwC", name:
"$iw", type: "class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC""), but it seems
like, as expected, somehow the full array of OLSMultipleLinearRegression
objects is getting pulled in.

I'm not sure I understand your comment about Array.ofDim being large.  When
serializing the array alone, it only takes up about 80K, which is close to
1867*5*sizeof(double).  The 400MB comes when referencing the array from a
function, which pulls in all the extra data.

Copying the global variable into a local one seems to work.  Much
appreciated, Matei.


On Mon, Nov 10, 2014 at 9:26 PM, Matei Zaharia <ma...@gmail.com>
wrote:

> Hey Sandy,
>
> Try using the -Dsun.io.serialization.extendedDebugInfo=true flag on the
> JVM to print the contents of the objects. In addition, something else that
> helps is to do the following:
>
> {
>   val  _arr = arr
>   models.map(... _arr ...)
> }
>
> Basically, copy the global variable into a local one. Then the field
> access from outside (from the interpreter-generated object that contains
> the line initializing arr) is no longer required, and the closure no longer
> has a reference to that.
>
> I'm really confused as to why Array.ofDim would be so large by the way,
> but are you sure you haven't flipped around the dimensions (e.g. it should
> be 5 x 1800)? A 5-double array will consume more than 5*8 bytes (probably
> something like 60 at least), and an array of those will still have a
> pointer to each one, so I'd expect that many of them to be more than 80 MB
> (which is very close to 1867*5*8).
>
> Matei
>
> > On Nov 10, 2014, at 1:01 AM, Sandy Ryza <sa...@cloudera.com> wrote:
> >
> > I'm experiencing some strange behavior with closure serialization that
> is totally mind-boggling to me.  It appears that two arrays of equal size
> take up vastly different amount of space inside closures if they're
> generated in different ways.
> >
> > The basic flow of my app is to run a bunch of tiny regressions using
> Commons Math's OLSMultipleLinearRegression and then reference a 2D array of
> the results from a transformation.  I was running into OOME's and
> NotSerializableExceptions and tried to get closer to the root issue by
> calling the closure serializer directly.
> >   scala> val arr = models.map(_.estimateRegressionParameters()).toArray
> >
> > The result array is 1867 x 5. It serialized is 80k bytes, which seems
> about right:
> >   scala> SparkEnv.get.closureSerializer.newInstance().serialize(arr)
> >   res17: java.nio.ByteBuffer = java.nio.HeapByteBuffer[pos=0 lim=80027
> cap=80027]
> >
> > If I reference it from a simple function:
> >   scala> def func(x: Long) => arr.length
> >   scala> SparkEnv.get.closureSerializer.newInstance().serialize(func)
> > I get a NotSerializableException.
> >
> > If I take pains to create the array using a loop:
> >   scala> val arr = Array.ofDim[Double](1867, 5)
> >   scala> for (s <- 0 until models.length) {
> >   | factorWeights(s) = models(s).estimateRegressionParameters()
> >   | }
> > Serialization works, but the serialized closure for the function is a
> whopping 400MB.
> >
> > If I pass in an array of the same length that was created in a different
> way, the size of the serialized closure is only about 90K, which seems
> about right.
> >
> > Naively, it seems like somehow the history of how the array was created
> is having an effect on what happens to it inside a closure.
> >
> > Is this expected behavior?  Can anybody explain what's going on?
> >
> > any insight very appreciated,
> > Sandy
>
>

Re: closure serialization behavior driving me crazy

Posted by Matei Zaharia <ma...@gmail.com>.
Hey Sandy,

Try using the -Dsun.io.serialization.extendedDebugInfo=true flag on the JVM to print the contents of the objects. In addition, something else that helps is to do the following:

{
  val  _arr = arr
  models.map(... _arr ...)
}

Basically, copy the global variable into a local one. Then the field access from outside (from the interpreter-generated object that contains the line initializing arr) is no longer required, and the closure no longer has a reference to that.

I'm really confused as to why Array.ofDim would be so large by the way, but are you sure you haven't flipped around the dimensions (e.g. it should be 5 x 1800)? A 5-double array will consume more than 5*8 bytes (probably something like 60 at least), and an array of those will still have a pointer to each one, so I'd expect that many of them to be more than 80 MB (which is very close to 1867*5*8).

Matei

> On Nov 10, 2014, at 1:01 AM, Sandy Ryza <sa...@cloudera.com> wrote:
> 
> I'm experiencing some strange behavior with closure serialization that is totally mind-boggling to me.  It appears that two arrays of equal size take up vastly different amount of space inside closures if they're generated in different ways.
> 
> The basic flow of my app is to run a bunch of tiny regressions using Commons Math's OLSMultipleLinearRegression and then reference a 2D array of the results from a transformation.  I was running into OOME's and NotSerializableExceptions and tried to get closer to the root issue by calling the closure serializer directly.
>   scala> val arr = models.map(_.estimateRegressionParameters()).toArray
> 
> The result array is 1867 x 5. It serialized is 80k bytes, which seems about right:
>   scala> SparkEnv.get.closureSerializer.newInstance().serialize(arr)
>   res17: java.nio.ByteBuffer = java.nio.HeapByteBuffer[pos=0 lim=80027 cap=80027]
> 
> If I reference it from a simple function:
>   scala> def func(x: Long) => arr.length
>   scala> SparkEnv.get.closureSerializer.newInstance().serialize(func)
> I get a NotSerializableException.
> 
> If I take pains to create the array using a loop:
>   scala> val arr = Array.ofDim[Double](1867, 5)
>   scala> for (s <- 0 until models.length) {
>   | factorWeights(s) = models(s).estimateRegressionParameters()
>   | }
> Serialization works, but the serialized closure for the function is a whopping 400MB.
> 
> If I pass in an array of the same length that was created in a different way, the size of the serialized closure is only about 90K, which seems about right.
> 
> Naively, it seems like somehow the history of how the array was created is having an effect on what happens to it inside a closure.
> 
> Is this expected behavior?  Can anybody explain what's going on?
> 
> any insight very appreciated,
> Sandy


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: closure serialization behavior driving me crazy

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Sandy,

On Mon, Nov 10, 2014 at 6:01 PM, Sandy Ryza <sa...@cloudera.com> wrote:
>
> The result array is 1867 x 5. It serialized is 80k bytes, which seems
> about right:
>   scala> SparkEnv.get.closureSerializer.newInstance().serialize(arr)
>   res17: java.nio.ByteBuffer = java.nio.HeapByteBuffer[pos=0 lim=80027
> cap=80027]
>
> If I reference it from a simple function:
>   scala> def func(x: Long) => arr.length
>   scala> SparkEnv.get.closureSerializer.newInstance().serialize(func)
> I get a NotSerializableException.
>

(Are you in fact doing this from the Scala shell? Maybe try it from
compiled code.)

I may be wrong on that, but my understanding is that a "def" is a lot
different from a "val" (holding a function) in terms of serialization.  A
"def" (such as yours defined above) that is a method of a class does not
have a class file of its own, so it must be serialized with the whole
object that it belongs to (which can fail easily if you reference a
SparkContext etc.).
A "val" (such as `val func: Long => Long = x => arr.length`) will be
compiled into a class file of its own, so only that (pretty small) function
object will have to be serialized.  That may explain why some functions are
serializable while others with the same content aren't, and also the
difference in size.

Tobias