You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Matt Cheah <mc...@palantir.com> on 2015/02/18 05:31:45 UTC

JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

Hi everyone,

I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
before, since I assumed that every aggregation required a key. However, I
realized I could do my analysis using JavaRDD¹s aggregate() instead and not
use a key.

I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
requires that a ³createCombiner² function is provided, and the return value
from that function must be serializable using Kryo. When I switched to using
rdd.aggregate I assumed that the zero value would also be strictly Kryo
serialized, as it is a data item and not part of a closure or the
aggregation functions. However, I got a serialization exception as the
closure serializer (only valid serializer is the Java serializer) was used
instead.

I was wondering the following:
1. What is the rationale for making the zero value be serialized using the
closure serializer? This isn¹t part of the closure, but is an initial data
item.
2. Would it make sense for us to perhaps write a version of rdd.aggregate()
that takes a function as a parameter, that generates the zero value? This
would be more intuitive to be serialized using the closure serializer.
I believe aggregateByKey is also affected.

Thanks,

-Matt Cheah



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

Posted by Sean Owen <so...@cloudera.com>.
The serializer is created with

val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)

Which is definitely not the closure serializer and so should respect
what you are setting with spark.serializer.

Maybe you can do a quick bit of debugging to see where that assumption
breaks down? like are you sure spark.serializer is set everywhere?

On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah <mc...@palantir.com> wrote:
> Hi everyone,
>
> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD’s aggregate() instead and not
> use a key.
>
> I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey
> requires that a “createCombiner” function is provided, and the return value
> from that function must be serializable using Kryo. When I switched to using
> rdd.aggregate I assumed that the zero value would also be strictly Kryo
> serialized, as it is a data item and not part of a closure or the
> aggregation functions. However, I got a serialization exception as the
> closure serializer (only valid serializer is the Java serializer) was used
> instead.
>
> I was wondering the following:
>
> What is the rationale for making the zero value be serialized using the
> closure serializer? This isn’t part of the closure, but is an initial data
> item.
> Would it make sense for us to perhaps write a version of rdd.aggregate()
> that takes a function as a parameter, that generates the zero value? This
> would be more intuitive to be serialized using the closure serializer.
>
> I believe aggregateByKey is also affected.
>
> Thanks,
>
> -Matt Cheah

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

Posted by Reynold Xin <rx...@databricks.com>.
Yes, that's a bug and should be using the standard serializer.

On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen <so...@cloudera.com> wrote:

> That looks, at the least, inconsistent. As far as I know this should
> be changed so that the zero value is always cloned via the non-closure
> serializer. Any objection to that?
>
> On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah <mc...@palantir.com> wrote:
> > But RDD.aggregate() has this code:
> >
> >     // Clone the zero value since we will also be serializing it as part
> of
> > tasks
> >     var jobResult = Utils.clone(zeroValue,
> > sc.env.closureSerializer.newInstance())
> >
> > I do see the SparkEnv.get.serializer used in aggregateByKey however.
> Perhaps
> > we just missed it and need to apply the change to aggregate()? It seems
> > appropriate to target a fix for 1.3.0.
> >
> > -Matt Cheah
> > From: Josh Rosen <ro...@gmail.com>
> > Date: Wednesday, February 18, 2015 at 6:12 AM
> > To: Matt Cheah <mc...@palantir.com>
> > Cc: "dev@spark.apache.org" <de...@spark.apache.org>, Mingyu Kim
> > <mk...@palantir.com>, Andrew Ash <aa...@palantir.com>
> > Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero
> value
> > reasoning?
> >
> > It looks like this was fixed in
> > https://issues.apache.org/jira/browse/SPARK-4743 /
> > https://github.com/apache/spark/pull/3605.  Can you see whether that
> patch
> > fixes this issue for you?
> >
> >
> >
> > On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mc...@palantir.com> wrote:
> >>
> >> Hi everyone,
> >>
> >> I was using JavaPairRDD’s combineByKey() to compute all of my
> aggregations
> >> before, since I assumed that every aggregation required a key. However,
> I
> >> realized I could do my analysis using JavaRDD’s aggregate() instead and
> not
> >> use a key.
> >>
> >> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
> >> combineByKey requires that a “createCombiner” function is provided, and
> the
> >> return value from that function must be serializable using Kryo. When I
> >> switched to using rdd.aggregate I assumed that the zero value would
> also be
> >> strictly Kryo serialized, as it is a data item and not part of a
> closure or
> >> the aggregation functions. However, I got a serialization exception as
> the
> >> closure serializer (only valid serializer is the Java serializer) was
> used
> >> instead.
> >>
> >> I was wondering the following:
> >>
> >> What is the rationale for making the zero value be serialized using the
> >> closure serializer? This isn’t part of the closure, but is an initial
> data
> >> item.
> >> Would it make sense for us to perhaps write a version of rdd.aggregate()
> >> that takes a function as a parameter, that generates the zero value?
> This
> >> would be more intuitive to be serialized using the closure serializer.
> >>
> >> I believe aggregateByKey is also affected.
> >>
> >> Thanks,
> >>
> >> -Matt Cheah
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

Posted by Sean Owen <so...@cloudera.com>.
That looks, at the least, inconsistent. As far as I know this should
be changed so that the zero value is always cloned via the non-closure
serializer. Any objection to that?

On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah <mc...@palantir.com> wrote:
> But RDD.aggregate() has this code:
>
>     // Clone the zero value since we will also be serializing it as part of
> tasks
>     var jobResult = Utils.clone(zeroValue,
> sc.env.closureSerializer.newInstance())
>
> I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
> we just missed it and need to apply the change to aggregate()? It seems
> appropriate to target a fix for 1.3.0.
>
> -Matt Cheah
> From: Josh Rosen <ro...@gmail.com>
> Date: Wednesday, February 18, 2015 at 6:12 AM
> To: Matt Cheah <mc...@palantir.com>
> Cc: "dev@spark.apache.org" <de...@spark.apache.org>, Mingyu Kim
> <mk...@palantir.com>, Andrew Ash <aa...@palantir.com>
> Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value
> reasoning?
>
> It looks like this was fixed in
> https://issues.apache.org/jira/browse/SPARK-4743 /
> https://github.com/apache/spark/pull/3605.  Can you see whether that patch
> fixes this issue for you?
>
>
>
> On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mc...@palantir.com> wrote:
>>
>> Hi everyone,
>>
>> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
>> before, since I assumed that every aggregation required a key. However, I
>> realized I could do my analysis using JavaRDD’s aggregate() instead and not
>> use a key.
>>
>> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
>> combineByKey requires that a “createCombiner” function is provided, and the
>> return value from that function must be serializable using Kryo. When I
>> switched to using rdd.aggregate I assumed that the zero value would also be
>> strictly Kryo serialized, as it is a data item and not part of a closure or
>> the aggregation functions. However, I got a serialization exception as the
>> closure serializer (only valid serializer is the Java serializer) was used
>> instead.
>>
>> I was wondering the following:
>>
>> What is the rationale for making the zero value be serialized using the
>> closure serializer? This isn’t part of the closure, but is an initial data
>> item.
>> Would it make sense for us to perhaps write a version of rdd.aggregate()
>> that takes a function as a parameter, that generates the zero value? This
>> would be more intuitive to be serialized using the closure serializer.
>>
>> I believe aggregateByKey is also affected.
>>
>> Thanks,
>>
>> -Matt Cheah
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

Posted by Matt Cheah <mc...@palantir.com>.
But RDD.aggregate() has this code:

    // Clone the zero value since we will also be serializing it as part of
tasks
    var jobResult = Utils.clone(zeroValue,
sc.env.closureSerializer.newInstance())

I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
we just missed it and need to apply the change to aggregate()? It seems
appropriate to target a fix for 1.3.0.

-Matt Cheah
From:  Josh Rosen <ro...@gmail.com>
Date:  Wednesday, February 18, 2015 at 6:12 AM
To:  Matt Cheah <mc...@palantir.com>
Cc:  "dev@spark.apache.org" <de...@spark.apache.org>, Mingyu Kim
<mk...@palantir.com>, Andrew Ash <aa...@palantir.com>
Subject:  Re: JavaRDD Aggregate initial value - Closure-serialized zero
value reasoning?

It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743
<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira
_browse_SPARK-2D4743&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&
r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3
WSrjQQZX1rW_e9w&s=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekc&e=>  /
https://github.com/apache/spark/pull/3605
<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar
k_pull_3605&d=AwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ
9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1
rW_e9w&s=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8&e=> .  Can you see
whether that patch fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mc...@palantir.com> wrote:
> Hi everyone,
> 
> I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD¹s aggregate() instead and not
> use a key.
> 
> I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
> requires that a ³createCombiner² function is provided, and the return value
> from that function must be serializable using Kryo. When I switched to using
> rdd.aggregate I assumed that the zero value would also be strictly Kryo
> serialized, as it is a data item and not part of a closure or the aggregation
> functions. However, I got a serialization exception as the closure serializer
> (only valid serializer is the Java serializer) was used instead.
> 
> I was wondering the following:
> 1. What is the rationale for making the zero value be serialized using the
> closure serializer? This isn¹t part of the closure, but is an initial data
> item.
> 2. Would it make sense for us to perhaps write a version of rdd.aggregate()
> that takes a function as a parameter, that generates the zero value? This
> would be more intuitive to be serialized using the closure serializer.
> I believe aggregateByKey is also affected.
> 
> Thanks,
> 
> -Matt Cheah




Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

Posted by Josh Rosen <ro...@gmail.com>.
It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743 /
https://github.com/apache/spark/pull/3605.  Can you see whether that patch
fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah <mc...@palantir.com> wrote:

> Hi everyone,
>
> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD’s aggregate() instead and not
> use a key.
>
> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
> combineByKey requires that a “createCombiner” function is provided, and the
> return value from that function must be serializable using Kryo. When I
> switched to using rdd.aggregate I assumed that the zero value would also be
> strictly Kryo serialized, as it is a data item and not part of a closure or
> the aggregation functions. However, I got a serialization exception as the
> closure serializer (only valid serializer is the Java serializer) was used
> instead.
>
> I was wondering the following:
>
>    1. What is the rationale for making the zero value be serialized using
>    the closure serializer? This isn’t part of the closure, but is an initial
>    data item.
>    2. Would it make sense for us to perhaps write a version of
>    rdd.aggregate() that takes a function as a parameter, that generates the
>    zero value? This would be more intuitive to be serialized using the closure
>    serializer.
>
> I believe aggregateByKey is also affected.
>
> Thanks,
>
> -Matt Cheah
>