You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Antony Mayi <an...@yahoo.com> on 2017/03/24 07:03:11 UTC

guava collections and kryo under spark runner

Hi,
I am using guava's collections (immutables from 21.0) in my beam pipelines and when running on spark runner it fails due to kryo unable to serialize those. I can see there have been some approaches addressing this using de.javakaffee.kryo-serializers -> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java but that's been removed recently.
how should I solve this?
the stacktrace is bellow.
thanks,antony.

[WARNING] java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationExceptionSerialization trace:fields (my.pkg.types.MyType)value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow) at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102) at my.pkg.Main.main(Main.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282) at java.lang.Thread.run(Thread.java:745)

Re: guava collections and kryo under spark runner

Posted by Aviem Zur <av...@gmail.com>.
Update: We've opened a ticket to track this issue and we're working on a
fix.
Please follow https://issues.apache.org/jira/browse/BEAM-1810 for updates.

Thanks again for bringing this issue to our attention!

On Fri, Mar 24, 2017 at 3:53 PM Aviem Zur <av...@gmail.com> wrote:

> Oh yes I see your second version now, that indeed reproduces the issue,
> thanks!
> I'll update the gist to include this change.
>
> On Fri, Mar 24, 2017 at 3:42 PM Antony Mayi <an...@yahoo.com> wrote:
>
> Hi Aviem,
>
> Apologies for the confusion - did you see my second version of the file I
> sent shortly after the first one? That second one had the Row class
> included (using just "implements Serializable").
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 13:36, Aviem Zur <av...@gmail.com> wrote:
>
>
> Hi Antony,
>
> Thanks for sharing your code!
>
> I created a test that uses the exact pipeline. I couldn't find the `Row`
> class referred to in your pipeline so I created it as a POJO and registered
> its coder as `AvroCoder`.
>
> Unfortunately this test passes so it does not reproduce the issue you are
> experiencing.
> Please find the test in the following gist
> https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12
>
> Can you try to tweak it to be more like your use case in which you hit the
> exception?
>
> On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <an...@yahoo.com> wrote:
>
> sorry, wrong version of the file. now corrected:
> a.
>
>
> On Friday, 24 March 2017, 13:06, Antony Mayi <an...@yahoo.com> wrote:
>
>
> Hi Aviem,
>
> it took me a while to narrow it down to a simple reproducible case but
> here it is. The problem appears to be related to Combine.globally().
> Attached is my demo code showing the error.
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 10:19, Aviem Zur <av...@gmail.com> wrote:
>
>
> Hi Antony.
>
> Spark uses serializers to serialize data, however this clashes with Beam's
> concept of coders, so we should be using coders instead of Spark's
> serializer (Specifically, in our configuration, Kryo is used as Spark's
> serializer).
>
> From your stack trace it seems that Kryo is being used to serialize your
> class my.pkg.types.MyType . This shouldn't happen.
> My guess is we are accidentally using Spark's serializer (Kryo) somewhere
> instead of coders.
>
> If you share your pipeline (feel free to redact anything pertaining to
> your organization) it will help us locate where this issue is happening.
>
>
> On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org <ma...@apache.org>
> > http://blog.nanthrax.net <http://blog.nanthrax.net/>
> > Talend - http://www.talend.com <http://www.talend.com/>
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>
>
>

Re: guava collections and kryo under spark runner

Posted by Aviem Zur <av...@gmail.com>.
Oh yes I see your second version now, that indeed reproduces the issue,
thanks!
I'll update the gist to include this change.

On Fri, Mar 24, 2017 at 3:42 PM Antony Mayi <an...@yahoo.com> wrote:

> Hi Aviem,
>
> Apologies for the confusion - did you see my second version of the file I
> sent shortly after the first one? That second one had the Row class
> included (using just "implements Serializable").
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 13:36, Aviem Zur <av...@gmail.com> wrote:
>
>
> Hi Antony,
>
> Thanks for sharing your code!
>
> I created a test that uses the exact pipeline. I couldn't find the `Row`
> class referred to in your pipeline so I created it as a POJO and registered
> its coder as `AvroCoder`.
>
> Unfortunately this test passes so it does not reproduce the issue you are
> experiencing.
> Please find the test in the following gist
> https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12
>
> Can you try to tweak it to be more like your use case in which you hit the
> exception?
>
> On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <an...@yahoo.com> wrote:
>
> sorry, wrong version of the file. now corrected:
> a.
>
>
> On Friday, 24 March 2017, 13:06, Antony Mayi <an...@yahoo.com> wrote:
>
>
> Hi Aviem,
>
> it took me a while to narrow it down to a simple reproducible case but
> here it is. The problem appears to be related to Combine.globally().
> Attached is my demo code showing the error.
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 10:19, Aviem Zur <av...@gmail.com> wrote:
>
>
> Hi Antony.
>
> Spark uses serializers to serialize data, however this clashes with Beam's
> concept of coders, so we should be using coders instead of Spark's
> serializer (Specifically, in our configuration, Kryo is used as Spark's
> serializer).
>
> From your stack trace it seems that Kryo is being used to serialize your
> class my.pkg.types.MyType . This shouldn't happen.
> My guess is we are accidentally using Spark's serializer (Kryo) somewhere
> instead of coders.
>
> If you share your pipeline (feel free to redact anything pertaining to
> your organization) it will help us locate where this issue is happening.
>
>
> On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org <ma...@apache.org>
> > http://blog.nanthrax.net <http://blog.nanthrax.net/>
> > Talend - http://www.talend.com <http://www.talend.com/>
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>
>
>

Re: guava collections and kryo under spark runner

Posted by Antony Mayi <an...@yahoo.com>.
Hi Aviem,
Apologies for the confusion - did you see my second version of the file I sent shortly after the first one? That second one had the Row class included (using just "implements Serializable").
Thanks,a. 

    On Friday, 24 March 2017, 13:36, Aviem Zur <av...@gmail.com> wrote:
 

 Hi Antony,
Thanks for sharing your code!
I created a test that uses the exact pipeline. I couldn't find the `Row` class referred to in your pipeline so I created it as a POJO and registered its coder as `AvroCoder`.
Unfortunately this test passes so it does not reproduce the issue you are experiencing.Please find the test in the following gist https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12
Can you try to tweak it to be more like your use case in which you hit the exception?
On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <an...@yahoo.com> wrote:

sorry, wrong version of the file. now corrected:a. 

    On Friday, 24 March 2017, 13:06, Antony Mayi <an...@yahoo.com> wrote:
 

 Hi Aviem,
it took me a while to narrow it down to a simple reproducible case but here it is. The problem appears to be related to Combine.globally(). Attached is my demo code showing the error.
Thanks,a. 

    On Friday, 24 March 2017, 10:19, Aviem Zur <av...@gmail.com> wrote:
 

 Hi Antony.
Spark uses serializers to serialize data, however this clashes with Beam's concept of coders, so we should be using coders instead of Spark's serializer (Specifically, in our configuration, Kryo is used as Spark's serializer).
From your stack trace it seems that Kryo is being used to serialize your class my.pkg.types.MyType . This shouldn't happen.My guess is we are accidentally using Spark's serializer (Kryo) somewhere instead of coders.
If you share your pipeline (feel free to redact anything pertaining to your organization) it will help us locate where this issue is happening.

On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

OK, discussing with Aviem, the problem is that Kryo is not able to serialize
Guava collections (it's a known issue).

The question is why Kryo wants to serialize the collections (it could be related
to a change in the Windowing code).

Aviem and I are taking a look on that.

Regards
JB

On 03/24/2017 09:10 AM, Antony Mayi wrote:
> I am on 0.6.0
>
> thx,
> a.
>
>
> On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>
>
> Hi Antony,
>
> which Beam version are you using ? We did some improvement about guava shading
> recently, wanted to check if it's related.
>
> Regards
> JB
>
> On 03/24/2017 08:03 AM, Antony Mayi wrote:
>> Hi,
>>
>> I am using guava's collections (immutables from 21.0) in my beam pipelines and
>> when running on spark runner it fails due to kryo unable to serialize those. I
>> can see there have been some approaches addressing this using
>> de.javakaffee.kryo-serializers
>> ->
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> <https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java>but
>> that's been removed recently.
>>
>> how should I solve this?
>>
>> the stacktrace is bellow.
>>
>> thanks,
>> antony.
>>
>>
>> [WARNING]
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to
>> stage failure: Exception while getting task result:
>> com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
>> Serialization trace:
>> fields (my.pkg.types.MyType)
>> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>> at my.pkg.Main.main(Main.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>> at java.lang.Thread.run(Thread.java:745)
>
>>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



   

   


   

Re: guava collections and kryo under spark runner

Posted by Aviem Zur <av...@gmail.com>.
Hi Antony,

Thanks for sharing your code!

I created a test that uses the exact pipeline. I couldn't find the `Row`
class referred to in your pipeline so I created it as a POJO and registered
its coder as `AvroCoder`.

Unfortunately this test passes so it does not reproduce the issue you are
experiencing.
Please find the test in the following gist
https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12

Can you try to tweak it to be more like your use case in which you hit the
exception?

On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <an...@yahoo.com> wrote:

> sorry, wrong version of the file. now corrected:
> a.
>
>
> On Friday, 24 March 2017, 13:06, Antony Mayi <an...@yahoo.com> wrote:
>
>
> Hi Aviem,
>
> it took me a while to narrow it down to a simple reproducible case but
> here it is. The problem appears to be related to Combine.globally().
> Attached is my demo code showing the error.
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 10:19, Aviem Zur <av...@gmail.com> wrote:
>
>
> Hi Antony.
>
> Spark uses serializers to serialize data, however this clashes with Beam's
> concept of coders, so we should be using coders instead of Spark's
> serializer (Specifically, in our configuration, Kryo is used as Spark's
> serializer).
>
> From your stack trace it seems that Kryo is being used to serialize your
> class my.pkg.types.MyType . This shouldn't happen.
> My guess is we are accidentally using Spark's serializer (Kryo) somewhere
> instead of coders.
>
> If you share your pipeline (feel free to redact anything pertaining to
> your organization) it will help us locate where this issue is happening.
>
>
> On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org <ma...@apache.org>
> > http://blog.nanthrax.net <http://blog.nanthrax.net/>
> > Talend - http://www.talend.com <http://www.talend.com/>
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>

Re: guava collections and kryo under spark runner

Posted by Antony Mayi <an...@yahoo.com>.
sorry, wrong version of the file. now corrected:a. 

    On Friday, 24 March 2017, 13:06, Antony Mayi <an...@yahoo.com> wrote:
 

 Hi Aviem,
it took me a while to narrow it down to a simple reproducible case but here it is. The problem appears to be related to Combine.globally(). Attached is my demo code showing the error.
Thanks,a. 

    On Friday, 24 March 2017, 10:19, Aviem Zur <av...@gmail.com> wrote:
 

 Hi Antony.
Spark uses serializers to serialize data, however this clashes with Beam's concept of coders, so we should be using coders instead of Spark's serializer (Specifically, in our configuration, Kryo is used as Spark's serializer).
From your stack trace it seems that Kryo is being used to serialize your class my.pkg.types.MyType . This shouldn't happen.My guess is we are accidentally using Spark's serializer (Kryo) somewhere instead of coders.
If you share your pipeline (feel free to redact anything pertaining to your organization) it will help us locate where this issue is happening.

On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

OK, discussing with Aviem, the problem is that Kryo is not able to serialize
Guava collections (it's a known issue).

The question is why Kryo wants to serialize the collections (it could be related
to a change in the Windowing code).

Aviem and I are taking a look on that.

Regards
JB

On 03/24/2017 09:10 AM, Antony Mayi wrote:
> I am on 0.6.0
>
> thx,
> a.
>
>
> On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>
>
> Hi Antony,
>
> which Beam version are you using ? We did some improvement about guava shading
> recently, wanted to check if it's related.
>
> Regards
> JB
>
> On 03/24/2017 08:03 AM, Antony Mayi wrote:
>> Hi,
>>
>> I am using guava's collections (immutables from 21.0) in my beam pipelines and
>> when running on spark runner it fails due to kryo unable to serialize those. I
>> can see there have been some approaches addressing this using
>> de.javakaffee.kryo-serializers
>> ->
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> <https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java>but
>> that's been removed recently.
>>
>> how should I solve this?
>>
>> the stacktrace is bellow.
>>
>> thanks,
>> antony.
>>
>>
>> [WARNING]
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to
>> stage failure: Exception while getting task result:
>> com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
>> Serialization trace:
>> fields (my.pkg.types.MyType)
>> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>> at my.pkg.Main.main(Main.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>> at java.lang.Thread.run(Thread.java:745)
>
>>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



   

   

Re: guava collections and kryo under spark runner

Posted by Antony Mayi <an...@yahoo.com>.
Hi Aviem,
it took me a while to narrow it down to a simple reproducible case but here it is. The problem appears to be related to Combine.globally(). Attached is my demo code showing the error.
Thanks,a. 

    On Friday, 24 March 2017, 10:19, Aviem Zur <av...@gmail.com> wrote:
 

 Hi Antony.
Spark uses serializers to serialize data, however this clashes with Beam's concept of coders, so we should be using coders instead of Spark's serializer (Specifically, in our configuration, Kryo is used as Spark's serializer).
From your stack trace it seems that Kryo is being used to serialize your class my.pkg.types.MyType . This shouldn't happen.My guess is we are accidentally using Spark's serializer (Kryo) somewhere instead of coders.
If you share your pipeline (feel free to redact anything pertaining to your organization) it will help us locate where this issue is happening.

On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

OK, discussing with Aviem, the problem is that Kryo is not able to serialize
Guava collections (it's a known issue).

The question is why Kryo wants to serialize the collections (it could be related
to a change in the Windowing code).

Aviem and I are taking a look on that.

Regards
JB

On 03/24/2017 09:10 AM, Antony Mayi wrote:
> I am on 0.6.0
>
> thx,
> a.
>
>
> On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>
>
> Hi Antony,
>
> which Beam version are you using ? We did some improvement about guava shading
> recently, wanted to check if it's related.
>
> Regards
> JB
>
> On 03/24/2017 08:03 AM, Antony Mayi wrote:
>> Hi,
>>
>> I am using guava's collections (immutables from 21.0) in my beam pipelines and
>> when running on spark runner it fails due to kryo unable to serialize those. I
>> can see there have been some approaches addressing this using
>> de.javakaffee.kryo-serializers
>> ->
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> <https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java>but
>> that's been removed recently.
>>
>> how should I solve this?
>>
>> the stacktrace is bellow.
>>
>> thanks,
>> antony.
>>
>>
>> [WARNING]
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to
>> stage failure: Exception while getting task result:
>> com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
>> Serialization trace:
>> fields (my.pkg.types.MyType)
>> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>> at my.pkg.Main.main(Main.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>> at java.lang.Thread.run(Thread.java:745)
>
>>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



   

Re: guava collections and kryo under spark runner

Posted by Aviem Zur <av...@gmail.com>.
Hi Antony.

Spark uses serializers to serialize data, however this clashes with Beam's
concept of coders, so we should be using coders instead of Spark's
serializer (Specifically, in our configuration, Kryo is used as Spark's
serializer).

From your stack trace it seems that Kryo is being used to serialize your
class my.pkg.types.MyType . This shouldn't happen.
My guess is we are accidentally using Spark's serializer (Kryo) somewhere
instead of coders.

If you share your pipeline (feel free to redact anything pertaining to your
organization) it will help us locate where this issue is happening.


On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org <ma...@apache.org>
> > http://blog.nanthrax.net <http://blog.nanthrax.net/>
> > Talend - http://www.talend.com <http://www.talend.com/>
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: guava collections and kryo under spark runner

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
OK, discussing with Aviem, the problem is that Kryo is not able to serialize 
Guava collections (it's a known issue).

The question is why Kryo wants to serialize the collections (it could be related 
to a change in the Windowing code).

Aviem and I are taking a look on that.

Regards
JB

On 03/24/2017 09:10 AM, Antony Mayi wrote:
> I am on 0.6.0
>
> thx,
> a.
>
>
> On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofr� <jb...@nanthrax.net> wrote:
>
>
> Hi Antony,
>
> which Beam version are you using ? We did some improvement about guava shading
> recently, wanted to check if it's related.
>
> Regards
> JB
>
> On 03/24/2017 08:03 AM, Antony Mayi wrote:
>> Hi,
>>
>> I am using guava's collections (immutables from 21.0) in my beam pipelines and
>> when running on spark runner it fails due to kryo unable to serialize those. I
>> can see there have been some approaches addressing this using
>> de.javakaffee.kryo-serializers
>> ->
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> <https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java>but
>> that's been removed recently.
>>
>> how should I solve this?
>>
>> the stacktrace is bellow.
>>
>> thanks,
>> antony.
>>
>>
>> [WARNING]
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to
>> stage failure: Exception while getting task result:
>> com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
>> Serialization trace:
>> fields (my.pkg.types.MyType)
>> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
>> at
>>
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>> at my.pkg.Main.main(Main.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>> at java.lang.Thread.run(Thread.java:745)
>
>>
>
> --
> Jean-Baptiste Onofr�
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net <http://blog.nanthrax.net/>
> Talend - http://www.talend.com <http://www.talend.com/>
>
>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: guava collections and kryo under spark runner

Posted by Antony Mayi <an...@yahoo.com>.
I am on 0.6.0
thx,a. 

    On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
 

 Hi Antony,

which Beam version are you using ? We did some improvement about guava shading 
recently, wanted to check if it's related.

Regards
JB

On 03/24/2017 08:03 AM, Antony Mayi wrote:
> Hi,
>
> I am using guava's collections (immutables from 21.0) in my beam pipelines and
> when running on spark runner it fails due to kryo unable to serialize those. I
> can see there have been some approaches addressing this using
> de.javakaffee.kryo-serializers
> -> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java but
> that's been removed recently.
>
> how should I solve this?
>
> the stacktrace is bellow.
>
> thanks,
> antony.
>
>
> [WARNING]
> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to
> stage failure: Exception while getting task result:
> com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
> Serialization trace:
> fields (my.pkg.types.MyType)
> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> at my.pkg.Main.main(Main.java:33)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> at java.lang.Thread.run(Thread.java:745)
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


   

Re: guava collections and kryo under spark runner

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Antony,

which Beam version are you using ? We did some improvement about guava shading 
recently, wanted to check if it's related.

Regards
JB

On 03/24/2017 08:03 AM, Antony Mayi wrote:
> Hi,
>
> I am using guava's collections (immutables from 21.0) in my beam pipelines and
> when running on spark runner it fails due to kryo unable to serialize those. I
> can see there have been some approaches addressing this using
> de.javakaffee.kryo-serializers
> -> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java but
> that's been removed recently.
>
> how should I solve this?
>
> the stacktrace is bellow.
>
> thanks,
> antony.
>
>
> [WARNING]
> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to
> stage failure: Exception while getting task result:
> com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
> Serialization trace:
> fields (my.pkg.types.MyType)
> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> at my.pkg.Main.main(Main.java:33)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> at java.lang.Thread.run(Thread.java:745)
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com