You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Thomas Weise <th...@apache.org> on 2016/06/01 04:10:20 UTC

Serialization for org.apache.beam.sdk.util.WindowedValue$*

Hi,

I'm working on putting together a basic runner for Apache Apex.

Hitting a couple of serialization related issues with running tests. Apex
is using Kryo for serialization by default (and Kryo can delegate to other
serialization frameworks).

The inner classes of WindowedValue are private and have no default
constructor, which the Kryo field serializer does not like. Also these
classes are not Java serializable, so that's not a fallback option (not
that it would be efficient anyways).

What's the recommended technique to move the WindowedValues over the wire?

Also, PipelineOptions aren't serializable, while most other classes are.
They are needed for example with DoFnRunnerBase, so what's the recommended
way to distribute them? Disassemble/reassemble? :)

Thanks,
Thomas

Re: Serialization for org.apache.beam.sdk.util.WindowedValue$*

Posted by Thomas Weise <th...@gmail.com>.
Amit,

Thanks for this pointer as well, CoderHelpers helps indeed!

Thomas

On Thu, Jun 2, 2016 at 12:51 PM, Amit Sela <am...@gmail.com> wrote:

> Oh sorry, of course I meant Thomas Groh in my previous email.. But @Thomas
> Weise this example
> <
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L108
> >
> might
> help, this is how the Spark runner uses Coders like Thomas Groh described.
>
> And i agree that we should consider making PipelineOptions Serializable or
> provide a generic solution for Runners.
>
> Hope this helps,
> Amit
>
> On Thu, Jun 2, 2016 at 10:35 PM Amit Sela <am...@gmail.com> wrote:
>
> > Thomas is right, though in my case, I encountered this issue when using
> > Spark's new API that uses Encoders
> > <
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala>
> not
> > just for serialization but also for "translating" the object into a
> schema
> > of optimized execution with Tungsten
> > <
> https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
> >.
> >
> > I this case I'm using Kryo and I've solved this by registering (in Spark
> > not Beam) custom serializers from
> > https://github.com/magro/kryo-serializers
> > I would consider (in the future) to implement Encoders with the help of
> > Coders but I still didn't wrap my mind around this.
> >
> > On Thu, Jun 2, 2016 at 9:59 PM Thomas Groh <tg...@google.com.invalid>
> > wrote:
> >
> >> The Beam Model ensures that all PCollections have a Coder; the
> PCollection
> >> Coder is the standard way to materialize the elements of a
> >> PCollection[1][2]. Most SDK-provided classes that will need to be
> >> transferred across the wire have an associated coder, and some
> additional
> >> default datatypes have coders associated with (in the CoderRegistry[3]).
> >>
> >> FullWindowedValueCoder[4] is capable of encoding and decoding the
> entirety
> >> of a WindowedValue, and is constructed from a ValueCoder (obtained from
> >> the
> >> PCollection) and a WindowCoder (obtained from the WindowFn of the
> >> WindowingStrategy of the PCollection). Given an input PCollection `pc`,
> >> you
> >> can construct the FullWindowedValueCoder with the following code snippet
> >>
> >> ```
> >> FullWindowedValueCoder.of(pc.getCoder(),
> >> pc.getWindowingStrategy().getWindowFn().windowCoder())
> >> ```
> >>
> >> [1]
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
> >> [2]
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130
> >> [3]
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94
> >> [4]
> >>
> >>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515
> >>
> >> On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <th...@gmail.com>
> >> wrote:
> >>
> >> > Hi Amit,
> >> >
> >> > Thanks for the help. I implemented the same serialization workaround
> for
> >> > the PipelineOptions. Since every distributed runner will have to solve
> >> > this, would it make sense to provide the serialization support along
> >> with
> >> > the interface proxy?
> >> >
> >> > Here is the exception I get with with WindowedValue:
> >> >
> >> > com.esotericsoftware.kryo.KryoException: Class cannot be created
> >> (missing
> >> > no-arg constructor):
> >> > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
> >> > at
> >> >
> >> >
> >>
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> >> > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> >> > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> >> > at
> >> >
> >> >
> >>
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> >> > at
> >> >
> >> >
> >>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> >> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> >> >
> >> > Thanks,
> >> > Thomas
> >> >
> >> >
> >> > On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <am...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Thomas,
> >> > >
> >> > > Spark and the Spark runner are using kryo for serialization and it
> >> seems
> >> > to
> >> > > work just fine. What is your exact problem ? stack trace/message ?
> >> > > I've hit an issue with Guava's ImmutableList/Map etc. and used
> >> > > https://github.com/magro/kryo-serializers for that.
> >> > >
> >> > > For PipelineOptions you can take a look at the Spark runner code
> here:
> >> > >
> >> > >
> >> >
> >>
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
> >> > >
> >> > > I'd be happy to assist with Kryo.
> >> > >
> >> > > Thanks,
> >> > > Amit
> >> > >
> >> > > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <th...@apache.org> wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I'm working on putting together a basic runner for Apache Apex.
> >> > > >
> >> > > > Hitting a couple of serialization related issues with running
> tests.
> >> > Apex
> >> > > > is using Kryo for serialization by default (and Kryo can delegate
> to
> >> > > other
> >> > > > serialization frameworks).
> >> > > >
> >> > > > The inner classes of WindowedValue are private and have no default
> >> > > > constructor, which the Kryo field serializer does not like. Also
> >> these
> >> > > > classes are not Java serializable, so that's not a fallback option
> >> (not
> >> > > > that it would be efficient anyways).
> >> > > >
> >> > > > What's the recommended technique to move the WindowedValues over
> the
> >> > > wire?
> >> > > >
> >> > > > Also, PipelineOptions aren't serializable, while most other
> classes
> >> > are.
> >> > > > They are needed for example with DoFnRunnerBase, so what's the
> >> > > recommended
> >> > > > way to distribute them? Disassemble/reassemble? :)
> >> > > >
> >> > > > Thanks,
> >> > > > Thomas
> >> > > >
> >> > >
> >> >
> >>
> >
>

Re: Serialization for org.apache.beam.sdk.util.WindowedValue$*

Posted by Amit Sela <am...@gmail.com>.
Oh sorry, of course I meant Thomas Groh in my previous email.. But @Thomas
Weise this example
<https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L108>
might
help, this is how the Spark runner uses Coders like Thomas Groh described.

And i agree that we should consider making PipelineOptions Serializable or
provide a generic solution for Runners.

Hope this helps,
Amit

On Thu, Jun 2, 2016 at 10:35 PM Amit Sela <am...@gmail.com> wrote:

> Thomas is right, though in my case, I encountered this issue when using
> Spark's new API that uses Encoders
> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala> not
> just for serialization but also for "translating" the object into a schema
> of optimized execution with Tungsten
> <https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html>.
>
> I this case I'm using Kryo and I've solved this by registering (in Spark
> not Beam) custom serializers from
> https://github.com/magro/kryo-serializers
> I would consider (in the future) to implement Encoders with the help of
> Coders but I still didn't wrap my mind around this.
>
> On Thu, Jun 2, 2016 at 9:59 PM Thomas Groh <tg...@google.com.invalid>
> wrote:
>
>> The Beam Model ensures that all PCollections have a Coder; the PCollection
>> Coder is the standard way to materialize the elements of a
>> PCollection[1][2]. Most SDK-provided classes that will need to be
>> transferred across the wire have an associated coder, and some additional
>> default datatypes have coders associated with (in the CoderRegistry[3]).
>>
>> FullWindowedValueCoder[4] is capable of encoding and decoding the entirety
>> of a WindowedValue, and is constructed from a ValueCoder (obtained from
>> the
>> PCollection) and a WindowCoder (obtained from the WindowFn of the
>> WindowingStrategy of the PCollection). Given an input PCollection `pc`,
>> you
>> can construct the FullWindowedValueCoder with the following code snippet
>>
>> ```
>> FullWindowedValueCoder.of(pc.getCoder(),
>> pc.getWindowingStrategy().getWindowFn().windowCoder())
>> ```
>>
>> [1]
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
>> [2]
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130
>> [3]
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94
>> [4]
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515
>>
>> On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <th...@gmail.com>
>> wrote:
>>
>> > Hi Amit,
>> >
>> > Thanks for the help. I implemented the same serialization workaround for
>> > the PipelineOptions. Since every distributed runner will have to solve
>> > this, would it make sense to provide the serialization support along
>> with
>> > the interface proxy?
>> >
>> > Here is the exception I get with with WindowedValue:
>> >
>> > com.esotericsoftware.kryo.KryoException: Class cannot be created
>> (missing
>> > no-arg constructor):
>> > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
>> > at
>> >
>> >
>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>> > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>> > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>> > at
>> >
>> >
>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>> > at
>> >
>> >
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> >
>> > Thanks,
>> > Thomas
>> >
>> >
>> > On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <am...@gmail.com>
>> wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > Spark and the Spark runner are using kryo for serialization and it
>> seems
>> > to
>> > > work just fine. What is your exact problem ? stack trace/message ?
>> > > I've hit an issue with Guava's ImmutableList/Map etc. and used
>> > > https://github.com/magro/kryo-serializers for that.
>> > >
>> > > For PipelineOptions you can take a look at the Spark runner code here:
>> > >
>> > >
>> >
>> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
>> > >
>> > > I'd be happy to assist with Kryo.
>> > >
>> > > Thanks,
>> > > Amit
>> > >
>> > > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <th...@apache.org> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I'm working on putting together a basic runner for Apache Apex.
>> > > >
>> > > > Hitting a couple of serialization related issues with running tests.
>> > Apex
>> > > > is using Kryo for serialization by default (and Kryo can delegate to
>> > > other
>> > > > serialization frameworks).
>> > > >
>> > > > The inner classes of WindowedValue are private and have no default
>> > > > constructor, which the Kryo field serializer does not like. Also
>> these
>> > > > classes are not Java serializable, so that's not a fallback option
>> (not
>> > > > that it would be efficient anyways).
>> > > >
>> > > > What's the recommended technique to move the WindowedValues over the
>> > > wire?
>> > > >
>> > > > Also, PipelineOptions aren't serializable, while most other classes
>> > are.
>> > > > They are needed for example with DoFnRunnerBase, so what's the
>> > > recommended
>> > > > way to distribute them? Disassemble/reassemble? :)
>> > > >
>> > > > Thanks,
>> > > > Thomas
>> > > >
>> > >
>> >
>>
>

Re: Serialization for org.apache.beam.sdk.util.WindowedValue$*

Posted by Amit Sela <am...@gmail.com>.
Thomas is right, though in my case, I encountered this issue when using
Spark's new API that uses Encoders
<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala>
not
just for serialization but also for "translating" the object into a schema
of optimized execution with Tungsten
<https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html>.

I this case I'm using Kryo and I've solved this by registering (in Spark
not Beam) custom serializers from https://github.com/magro/kryo-serializers
I would consider (in the future) to implement Encoders with the help of
Coders but I still didn't wrap my mind around this.

On Thu, Jun 2, 2016 at 9:59 PM Thomas Groh <tg...@google.com.invalid> wrote:

> The Beam Model ensures that all PCollections have a Coder; the PCollection
> Coder is the standard way to materialize the elements of a
> PCollection[1][2]. Most SDK-provided classes that will need to be
> transferred across the wire have an associated coder, and some additional
> default datatypes have coders associated with (in the CoderRegistry[3]).
>
> FullWindowedValueCoder[4] is capable of encoding and decoding the entirety
> of a WindowedValue, and is constructed from a ValueCoder (obtained from the
> PCollection) and a WindowCoder (obtained from the WindowFn of the
> WindowingStrategy of the PCollection). Given an input PCollection `pc`, you
> can construct the FullWindowedValueCoder with the following code snippet
>
> ```
> FullWindowedValueCoder.of(pc.getCoder(),
> pc.getWindowingStrategy().getWindowFn().windowCoder())
> ```
>
> [1]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
> [2]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130
> [3]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94
> [4]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515
>
> On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <th...@gmail.com>
> wrote:
>
> > Hi Amit,
> >
> > Thanks for the help. I implemented the same serialization workaround for
> > the PipelineOptions. Since every distributed runner will have to solve
> > this, would it make sense to provide the serialization support along with
> > the interface proxy?
> >
> > Here is the exception I get with with WindowedValue:
> >
> > com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> > no-arg constructor):
> > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
> > at
> >
> >
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> > at
> >
> >
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> > at
> >
> >
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> >
> > Thanks,
> > Thomas
> >
> >
> > On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <am...@gmail.com> wrote:
> >
> > > Hi Thomas,
> > >
> > > Spark and the Spark runner are using kryo for serialization and it
> seems
> > to
> > > work just fine. What is your exact problem ? stack trace/message ?
> > > I've hit an issue with Guava's ImmutableList/Map etc. and used
> > > https://github.com/magro/kryo-serializers for that.
> > >
> > > For PipelineOptions you can take a look at the Spark runner code here:
> > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
> > >
> > > I'd be happy to assist with Kryo.
> > >
> > > Thanks,
> > > Amit
> > >
> > > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <th...@apache.org> wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm working on putting together a basic runner for Apache Apex.
> > > >
> > > > Hitting a couple of serialization related issues with running tests.
> > Apex
> > > > is using Kryo for serialization by default (and Kryo can delegate to
> > > other
> > > > serialization frameworks).
> > > >
> > > > The inner classes of WindowedValue are private and have no default
> > > > constructor, which the Kryo field serializer does not like. Also
> these
> > > > classes are not Java serializable, so that's not a fallback option
> (not
> > > > that it would be efficient anyways).
> > > >
> > > > What's the recommended technique to move the WindowedValues over the
> > > wire?
> > > >
> > > > Also, PipelineOptions aren't serializable, while most other classes
> > are.
> > > > They are needed for example with DoFnRunnerBase, so what's the
> > > recommended
> > > > way to distribute them? Disassemble/reassemble? :)
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > >
> >
>

Re: Serialization for org.apache.beam.sdk.util.WindowedValue$*

Posted by Thomas Weise <th...@gmail.com>.
Thanks, works like a charm! For such hidden gems there should be a Beam
runner newbie guide ;-)

Thomas


On Thu, Jun 2, 2016 at 11:59 AM, Thomas Groh <tg...@google.com.invalid>
wrote:

> The Beam Model ensures that all PCollections have a Coder; the PCollection
> Coder is the standard way to materialize the elements of a
> PCollection[1][2]. Most SDK-provided classes that will need to be
> transferred across the wire have an associated coder, and some additional
> default datatypes have coders associated with (in the CoderRegistry[3]).
>
> FullWindowedValueCoder[4] is capable of encoding and decoding the entirety
> of a WindowedValue, and is constructed from a ValueCoder (obtained from the
> PCollection) and a WindowCoder (obtained from the WindowFn of the
> WindowingStrategy of the PCollection). Given an input PCollection `pc`, you
> can construct the FullWindowedValueCoder with the following code snippet
>
> ```
> FullWindowedValueCoder.of(pc.getCoder(),
> pc.getWindowingStrategy().getWindowFn().windowCoder())
> ```
>
> [1]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
> [2]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130
> [3]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94
> [4]
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515
>
> On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <th...@gmail.com>
> wrote:
>
> > Hi Amit,
> >
> > Thanks for the help. I implemented the same serialization workaround for
> > the PipelineOptions. Since every distributed runner will have to solve
> > this, would it make sense to provide the serialization support along with
> > the interface proxy?
> >
> > Here is the exception I get with with WindowedValue:
> >
> > com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> > no-arg constructor):
> > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
> > at
> >
> >
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> > at
> >
> >
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> > at
> >
> >
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> >
> > Thanks,
> > Thomas
> >
> >
> > On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <am...@gmail.com> wrote:
> >
> > > Hi Thomas,
> > >
> > > Spark and the Spark runner are using kryo for serialization and it
> seems
> > to
> > > work just fine. What is your exact problem ? stack trace/message ?
> > > I've hit an issue with Guava's ImmutableList/Map etc. and used
> > > https://github.com/magro/kryo-serializers for that.
> > >
> > > For PipelineOptions you can take a look at the Spark runner code here:
> > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
> > >
> > > I'd be happy to assist with Kryo.
> > >
> > > Thanks,
> > > Amit
> > >
> > > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <th...@apache.org> wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm working on putting together a basic runner for Apache Apex.
> > > >
> > > > Hitting a couple of serialization related issues with running tests.
> > Apex
> > > > is using Kryo for serialization by default (and Kryo can delegate to
> > > other
> > > > serialization frameworks).
> > > >
> > > > The inner classes of WindowedValue are private and have no default
> > > > constructor, which the Kryo field serializer does not like. Also
> these
> > > > classes are not Java serializable, so that's not a fallback option
> (not
> > > > that it would be efficient anyways).
> > > >
> > > > What's the recommended technique to move the WindowedValues over the
> > > wire?
> > > >
> > > > Also, PipelineOptions aren't serializable, while most other classes
> > are.
> > > > They are needed for example with DoFnRunnerBase, so what's the
> > > recommended
> > > > way to distribute them? Disassemble/reassemble? :)
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > >
> >
>

Re: Serialization for org.apache.beam.sdk.util.WindowedValue$*

Posted by Thomas Groh <tg...@google.com.INVALID>.
The Beam Model ensures that all PCollections have a Coder; the PCollection
Coder is the standard way to materialize the elements of a
PCollection[1][2]. Most SDK-provided classes that will need to be
transferred across the wire have an associated coder, and some additional
default datatypes have coders associated with (in the CoderRegistry[3]).

FullWindowedValueCoder[4] is capable of encoding and decoding the entirety
of a WindowedValue, and is constructed from a ValueCoder (obtained from the
PCollection) and a WindowCoder (obtained from the WindowFn of the
WindowingStrategy of the PCollection). Given an input PCollection `pc`, you
can construct the FullWindowedValueCoder with the following code snippet

```
FullWindowedValueCoder.of(pc.getCoder(),
pc.getWindowingStrategy().getWindowFn().windowCoder())
```

[1]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
[2]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130
[3]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94
[4]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515

On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <th...@gmail.com>
wrote:

> Hi Amit,
>
> Thanks for the help. I implemented the same serialization workaround for
> the PipelineOptions. Since every distributed runner will have to solve
> this, would it make sense to provide the serialization support along with
> the interface proxy?
>
> Here is the exception I get with with WindowedValue:
>
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> no-arg constructor):
> org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
> at
>
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>
> Thanks,
> Thomas
>
>
> On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <am...@gmail.com> wrote:
>
> > Hi Thomas,
> >
> > Spark and the Spark runner are using kryo for serialization and it seems
> to
> > work just fine. What is your exact problem ? stack trace/message ?
> > I've hit an issue with Guava's ImmutableList/Map etc. and used
> > https://github.com/magro/kryo-serializers for that.
> >
> > For PipelineOptions you can take a look at the Spark runner code here:
> >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
> >
> > I'd be happy to assist with Kryo.
> >
> > Thanks,
> > Amit
> >
> > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <th...@apache.org> wrote:
> >
> > > Hi,
> > >
> > > I'm working on putting together a basic runner for Apache Apex.
> > >
> > > Hitting a couple of serialization related issues with running tests.
> Apex
> > > is using Kryo for serialization by default (and Kryo can delegate to
> > other
> > > serialization frameworks).
> > >
> > > The inner classes of WindowedValue are private and have no default
> > > constructor, which the Kryo field serializer does not like. Also these
> > > classes are not Java serializable, so that's not a fallback option (not
> > > that it would be efficient anyways).
> > >
> > > What's the recommended technique to move the WindowedValues over the
> > wire?
> > >
> > > Also, PipelineOptions aren't serializable, while most other classes
> are.
> > > They are needed for example with DoFnRunnerBase, so what's the
> > recommended
> > > way to distribute them? Disassemble/reassemble? :)
> > >
> > > Thanks,
> > > Thomas
> > >
> >
>

Re: Serialization for org.apache.beam.sdk.util.WindowedValue$*

Posted by Thomas Weise <th...@gmail.com>.
Hi Amit,

Thanks for the help. I implemented the same serialization workaround for
the PipelineOptions. Since every distributed runner will have to solve
this, would it make sense to provide the serialization support along with
the interface proxy?

Here is the exception I get with with WindowedValue:

com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
no-arg constructor):
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
at
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

Thanks,
Thomas


On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <am...@gmail.com> wrote:

> Hi Thomas,
>
> Spark and the Spark runner are using kryo for serialization and it seems to
> work just fine. What is your exact problem ? stack trace/message ?
> I've hit an issue with Guava's ImmutableList/Map etc. and used
> https://github.com/magro/kryo-serializers for that.
>
> For PipelineOptions you can take a look at the Spark runner code here:
>
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
>
> I'd be happy to assist with Kryo.
>
> Thanks,
> Amit
>
> On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <th...@apache.org> wrote:
>
> > Hi,
> >
> > I'm working on putting together a basic runner for Apache Apex.
> >
> > Hitting a couple of serialization related issues with running tests. Apex
> > is using Kryo for serialization by default (and Kryo can delegate to
> other
> > serialization frameworks).
> >
> > The inner classes of WindowedValue are private and have no default
> > constructor, which the Kryo field serializer does not like. Also these
> > classes are not Java serializable, so that's not a fallback option (not
> > that it would be efficient anyways).
> >
> > What's the recommended technique to move the WindowedValues over the
> wire?
> >
> > Also, PipelineOptions aren't serializable, while most other classes are.
> > They are needed for example with DoFnRunnerBase, so what's the
> recommended
> > way to distribute them? Disassemble/reassemble? :)
> >
> > Thanks,
> > Thomas
> >
>

Re: Serialization for org.apache.beam.sdk.util.WindowedValue$*

Posted by Amit Sela <am...@gmail.com>.
Hi Thomas,

Spark and the Spark runner are using kryo for serialization and it seems to
work just fine. What is your exact problem ? stack trace/message ?
I've hit an issue with Guava's ImmutableList/Map etc. and used
https://github.com/magro/kryo-serializers for that.

For PipelineOptions you can take a look at the Spark runner code here:
https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73

I'd be happy to assist with Kryo.

Thanks,
Amit

On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <th...@apache.org> wrote:

> Hi,
>
> I'm working on putting together a basic runner for Apache Apex.
>
> Hitting a couple of serialization related issues with running tests. Apex
> is using Kryo for serialization by default (and Kryo can delegate to other
> serialization frameworks).
>
> The inner classes of WindowedValue are private and have no default
> constructor, which the Kryo field serializer does not like. Also these
> classes are not Java serializable, so that's not a fallback option (not
> that it would be efficient anyways).
>
> What's the recommended technique to move the WindowedValues over the wire?
>
> Also, PipelineOptions aren't serializable, while most other classes are.
> They are needed for example with DoFnRunnerBase, so what's the recommended
> way to distribute them? Disassemble/reassemble? :)
>
> Thanks,
> Thomas
>