You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Carlos Alonso <ca...@mrcalonso.com> on 2018/01/19 12:43:09 UTC

Trying to understand Unable to encode element exceptions

Hi everyone!!

I'm building a pipeline to store items from a Google PubSub subscription
into GCS buckets. In order to do it I'm using both stateful and timely
processing and after building and testing the project locally I tried to
run it on Google Dataflow and I started getting those errors.

The full stack trace is here: https://pastebin.com/LqecPhsq

The item I'm trying to serialize is a KV[String, MessageWithAttributes] and
MessageWithAttributes is a case class defined as (content: String, attrs:
Map[String, String])

The underlying clause is java.io.NotSerializableException:
com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
well) which may suggest that the issue is on serializing the Map, but to be
honest, I don't know what does it mean and how to fix it.

Can anyone help me, please?
Thanks!

Re: Trying to understand Unable to encode element exceptions

Posted by Carlos Alonso <ca...@mrcalonso.com>.
I've added a comment with a link to our working Stateful and timely
processing solution:
https://github.com/spotify/scio/issues/448#issuecomment-364705100

On Fri, Jan 26, 2018 at 1:43 AM Neville Li <ne...@gmail.com> wrote:

> Here's a fix to #1020
> https://github.com/spotify/scio/pull/1032
>
> On Sun, Jan 21, 2018 at 4:36 PM Neville Li <ne...@gmail.com> wrote:
>
>> Awesome!
>> We have't wrapped any stateful processing API in scala but if you have
>> working snippet or ideas it'd be great to share in that ticket.
>>
>> On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> Thanks Neville!!
>>>
>>> Your recommendation worked great. Thanks for your help!!
>>>
>>> As a side note, I found this issue:
>>> https://github.com/spotify/scio/issues/448
>>>
>>> I can share/help there with our experience, as our job, with scio +
>>> stateful + timely processing is working fine as of today
>>>
>>> Regards!!
>>>
>>> On Fri, Jan 19, 2018 at 6:21 PM Neville Li <ne...@gmail.com>
>>> wrote:
>>>
>>>> Welcome.
>>>>
>>>> Added an issue so we may improve this in the future:
>>>> https://github.com/spotify/scio/issues/1020
>>>>
>>>>
>>>> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> To build the beam transform I was following this example:
>>>>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>>>>>
>>>>> To be honest I don't know how to apply timely and stateful processing
>>>>> without using a beam transform or how to rewrite it using the scio built-in
>>>>> you suggest. Could you please give me an example?
>>>>>
>>>>> Thanks for your help!
>>>>>
>>>>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <ne...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That happens when you mix beam transforms into scio and defeats the
>>>>>> safety we have in place. Map the values into something beam-serializable
>>>>>> first or rewrite the transform with a scio built-in which takes care of
>>>>>> KvCoder.
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm following this example:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>>>>
>>>>>>> because I'm building something very similar to a group into batches
>>>>>>> functionality. If I don't set the coder manually, this exception arises:
>>>>>>> https://pastebin.com/xxdDMXSf
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>>>>> KryoAtomicCoder for most Scala types.
>>>>>>>> More details:
>>>>>>>>
>>>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>>>>
>>>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> May it be because I’m using
>>>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>>>>>>> some point in the pipeline
>>>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>>>>> outputs a SerializableCoder)?
>>>>>>>>>
>>>>>>>>> This is something I've always wondered. How does one specify a
>>>>>>>>> coder for a case class?
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>>
>>>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an
>>>>>>>>>> GH issue with ideally a snippet that can reproduce the problem?
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi everyone!!
>>>>>>>>>>>
>>>>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>>>>>> and timely processing and after building and testing the project locally I
>>>>>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>>>>>
>>>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>>>>
>>>>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>>>>
>>>>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>>>>
>>>>>>>>>>> Can anyone help me, please?
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Neville Li <ne...@gmail.com>.
Here's a fix to #1020
https://github.com/spotify/scio/pull/1032

On Sun, Jan 21, 2018 at 4:36 PM Neville Li <ne...@gmail.com> wrote:

> Awesome!
> We have't wrapped any stateful processing API in scala but if you have
> working snippet or ideas it'd be great to share in that ticket.
>
> On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> Thanks Neville!!
>>
>> Your recommendation worked great. Thanks for your help!!
>>
>> As a side note, I found this issue:
>> https://github.com/spotify/scio/issues/448
>>
>> I can share/help there with our experience, as our job, with scio +
>> stateful + timely processing is working fine as of today
>>
>> Regards!!
>>
>> On Fri, Jan 19, 2018 at 6:21 PM Neville Li <ne...@gmail.com> wrote:
>>
>>> Welcome.
>>>
>>> Added an issue so we may improve this in the future:
>>> https://github.com/spotify/scio/issues/1020
>>>
>>>
>>> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> To build the beam transform I was following this example:
>>>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>>>>
>>>> To be honest I don't know how to apply timely and stateful processing
>>>> without using a beam transform or how to rewrite it using the scio built-in
>>>> you suggest. Could you please give me an example?
>>>>
>>>> Thanks for your help!
>>>>
>>>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <ne...@gmail.com>
>>>> wrote:
>>>>
>>>>> That happens when you mix beam transforms into scio and defeats the
>>>>> safety we have in place. Map the values into something beam-serializable
>>>>> first or rewrite the transform with a scio built-in which takes care of
>>>>> KvCoder.
>>>>>
>>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> I'm following this example:
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>>>
>>>>>> because I'm building something very similar to a group into batches
>>>>>> functionality. If I don't set the coder manually, this exception arises:
>>>>>> https://pastebin.com/xxdDMXSf
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>>>> KryoAtomicCoder for most Scala types.
>>>>>>> More details:
>>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>>>
>>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> May it be because I’m using
>>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>>>>>> some point in the pipeline
>>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>>>> outputs a SerializableCoder)?
>>>>>>>>
>>>>>>>> This is something I've always wondered. How does one specify a
>>>>>>>> coder for a case class?
>>>>>>>>
>>>>>>>> Regards
>>>>>>>>
>>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an
>>>>>>>>> GH issue with ideally a snippet that can reproduce the problem?
>>>>>>>>>
>>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everyone!!
>>>>>>>>>>
>>>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>>>>> and timely processing and after building and testing the project locally I
>>>>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>>>>
>>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>>>
>>>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>>>
>>>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>>>
>>>>>>>>>> Can anyone help me, please?
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Neville Li <ne...@gmail.com>.
Awesome!
We have't wrapped any stateful processing API in scala but if you have
working snippet or ideas it'd be great to share in that ticket.

On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <ca...@mrcalonso.com> wrote:

> Thanks Neville!!
>
> Your recommendation worked great. Thanks for your help!!
>
> As a side note, I found this issue:
> https://github.com/spotify/scio/issues/448
>
> I can share/help there with our experience, as our job, with scio +
> stateful + timely processing is working fine as of today
>
> Regards!!
>
> On Fri, Jan 19, 2018 at 6:21 PM Neville Li <ne...@gmail.com> wrote:
>
>> Welcome.
>>
>> Added an issue so we may improve this in the future:
>> https://github.com/spotify/scio/issues/1020
>>
>>
>> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> To build the beam transform I was following this example:
>>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>>>
>>> To be honest I don't know how to apply timely and stateful processing
>>> without using a beam transform or how to rewrite it using the scio built-in
>>> you suggest. Could you please give me an example?
>>>
>>> Thanks for your help!
>>>
>>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <ne...@gmail.com>
>>> wrote:
>>>
>>>> That happens when you mix beam transforms into scio and defeats the
>>>> safety we have in place. Map the values into something beam-serializable
>>>> first or rewrite the transform with a scio built-in which takes care of
>>>> KvCoder.
>>>>
>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> I'm following this example:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>>
>>>>> because I'm building something very similar to a group into batches
>>>>> functionality. If I don't set the coder manually, this exception arises:
>>>>> https://pastebin.com/xxdDMXSf
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>>> KryoAtomicCoder for most Scala types.
>>>>>> More details:
>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> May it be because I’m using
>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>>>>> some point in the pipeline
>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>>> outputs a SerializableCoder)?
>>>>>>>
>>>>>>> This is something I've always wondered. How does one specify a coder
>>>>>>> for a case class?
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>>>
>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone!!
>>>>>>>>>
>>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>>>> and timely processing and after building and testing the project locally I
>>>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>>>
>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>>
>>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>>
>>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>>
>>>>>>>>> Can anyone help me, please?
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Thanks Neville!!

Your recommendation worked great. Thanks for your help!!

As a side note, I found this issue:
https://github.com/spotify/scio/issues/448

I can share/help there with our experience, as our job, with scio +
stateful + timely processing is working fine as of today

Regards!!

On Fri, Jan 19, 2018 at 6:21 PM Neville Li <ne...@gmail.com> wrote:

> Welcome.
>
> Added an issue so we may improve this in the future:
> https://github.com/spotify/scio/issues/1020
>
>
> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> To build the beam transform I was following this example:
>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>>
>> To be honest I don't know how to apply timely and stateful processing
>> without using a beam transform or how to rewrite it using the scio built-in
>> you suggest. Could you please give me an example?
>>
>> Thanks for your help!
>>
>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <ne...@gmail.com> wrote:
>>
>>> That happens when you mix beam transforms into scio and defeats the
>>> safety we have in place. Map the values into something beam-serializable
>>> first or rewrite the transform with a scio built-in which takes care of
>>> KvCoder.
>>>
>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> I'm following this example:
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>
>>>> because I'm building something very similar to a group into batches
>>>> functionality. If I don't set the coder manually, this exception arises:
>>>> https://pastebin.com/xxdDMXSf
>>>>
>>>> Thanks!
>>>>
>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com>
>>>> wrote:
>>>>
>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>> KryoAtomicCoder for most Scala types.
>>>>> More details:
>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>
>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> May it be because I’m using
>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>>>> some point in the pipeline
>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>> outputs a SerializableCoder)?
>>>>>>
>>>>>> This is something I've always wondered. How does one specify a coder
>>>>>> for a case class?
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>>
>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone!!
>>>>>>>>
>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>>> and timely processing and after building and testing the project locally I
>>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>>
>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>
>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>
>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>
>>>>>>>> Can anyone help me, please?
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Neville Li <ne...@gmail.com>.
Welcome.

Added an issue so we may improve this in the future:
https://github.com/spotify/scio/issues/1020

On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> To build the beam transform I was following this example:
> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>
> To be honest I don't know how to apply timely and stateful processing
> without using a beam transform or how to rewrite it using the scio built-in
> you suggest. Could you please give me an example?
>
> Thanks for your help!
>
> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <ne...@gmail.com> wrote:
>
>> That happens when you mix beam transforms into scio and defeats the
>> safety we have in place. Map the values into something beam-serializable
>> first or rewrite the transform with a scio built-in which takes care of
>> KvCoder.
>>
>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> I'm following this example:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>
>>> because I'm building something very similar to a group into batches
>>> functionality. If I don't set the coder manually, this exception arises:
>>> https://pastebin.com/xxdDMXSf
>>>
>>> Thanks!
>>>
>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com>
>>> wrote:
>>>
>>>> You shouldn't manually set coder in most cases. It defaults to
>>>> KryoAtomicCoder for most Scala types.
>>>> More details:
>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>
>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> May it be because I’m using
>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>>> some point in the pipeline
>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>> outputs a SerializableCoder)?
>>>>>
>>>>> This is something I've always wondered. How does one specify a coder
>>>>> for a case class?
>>>>>
>>>>> Regards
>>>>>
>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone!!
>>>>>>>
>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>> and timely processing and after building and testing the project locally I
>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>
>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>
>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>
>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>
>>>>>>> Can anyone help me, please?
>>>>>>> Thanks!
>>>>>>>
>>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Carlos Alonso <ca...@mrcalonso.com>.
To build the beam transform I was following this example:
https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala

To be honest I don't know how to apply timely and stateful processing
without using a beam transform or how to rewrite it using the scio built-in
you suggest. Could you please give me an example?

Thanks for your help!

On Fri, Jan 19, 2018 at 5:04 PM Neville Li <ne...@gmail.com> wrote:

> That happens when you mix beam transforms into scio and defeats the safety
> we have in place. Map the values into something beam-serializable first or
> rewrite the transform with a scio built-in which takes care of KvCoder.
>
> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>
>> I'm following this example:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>
>> because I'm building something very similar to a group into batches
>> functionality. If I don't set the coder manually, this exception arises:
>> https://pastebin.com/xxdDMXSf
>>
>> Thanks!
>>
>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com> wrote:
>>
>>> You shouldn't manually set coder in most cases. It defaults to
>>> KryoAtomicCoder for most Scala types.
>>> More details:
>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>
>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> May it be because I’m using
>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>> some point in the pipeline
>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>> outputs a SerializableCoder)?
>>>>
>>>> This is something I've always wondered. How does one specify a coder
>>>> for a case class?
>>>>
>>>> Regards
>>>>
>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com> wrote:
>>>>
>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>
>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone!!
>>>>>>
>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>> and timely processing and after building and testing the project locally I
>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>
>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>
>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>> (content: String, attrs: Map[String, String])
>>>>>>
>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>
>>>>>> Can anyone help me, please?
>>>>>> Thanks!
>>>>>>
>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Ok, I’ll try that.

Thanks a lot for your help!!
On Fri, 19 Jan 2018 at 17:37, Neville Li <ne...@gmail.com> wrote:

> Didn't realize the map is in a case class which is serializable, but
> `java.util.Map` is not. So this won't work transitively.
> You best bet is to write a custom Coder (you can compose a map coder for
> the map field) for the entire case class and set it as part of the KvCoder.
>
>
> On Fri, Jan 19, 2018 at 11:22 AM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> You mean replacing the Map[String, String] from the case class into a
>> java.util.Map<String, String>? And then, how could I set that
>> MapCoder<String, String> for that bit?
>>
>> Sorry if those questions are too newbie, but this is my first experience
>> with Beam...
>>
>> Thanks!
>>
>> On Fri, Jan 19, 2018 at 5:19 PM Neville Li <ne...@gmail.com> wrote:
>>
>>> In this case it's probably easiest to map the scala `Map[K, V]` into a
>>> `java.util.Map<K, V>` and explicitly set a `MapCoder<K, V>` so you don't
>>> have to deal with internal coder inference.
>>>
>>>
>>> On Fri, Jan 19, 2018 at 11:03 AM Neville Li <ne...@gmail.com>
>>> wrote:
>>>
>>>> That happens when you mix beam transforms into scio and defeats the
>>>> safety we have in place. Map the values into something beam-serializable
>>>> first or rewrite the transform with a scio built-in which takes care of
>>>> KvCoder.
>>>>
>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> I'm following this example:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>>
>>>>> because I'm building something very similar to a group into batches
>>>>> functionality. If I don't set the coder manually, this exception arises:
>>>>> https://pastebin.com/xxdDMXSf
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>>> KryoAtomicCoder for most Scala types.
>>>>>> More details:
>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> May it be because I’m using
>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>>>>> some point in the pipeline
>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>>> outputs a SerializableCoder)?
>>>>>>>
>>>>>>> This is something I've always wondered. How does one specify a coder
>>>>>>> for a case class?
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>>>
>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone!!
>>>>>>>>>
>>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>>>> and timely processing and after building and testing the project locally I
>>>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>>>
>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>>
>>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>>
>>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>>
>>>>>>>>> Can anyone help me, please?
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Neville Li <ne...@gmail.com>.
Didn't realize the map is in a case class which is serializable, but
`java.util.Map` is not. So this won't work transitively.
You best bet is to write a custom Coder (you can compose a map coder for
the map field) for the entire case class and set it as part of the KvCoder.

On Fri, Jan 19, 2018 at 11:22 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> You mean replacing the Map[String, String] from the case class into a
> java.util.Map<String, String>? And then, how could I set that
> MapCoder<String, String> for that bit?
>
> Sorry if those questions are too newbie, but this is my first experience
> with Beam...
>
> Thanks!
>
> On Fri, Jan 19, 2018 at 5:19 PM Neville Li <ne...@gmail.com> wrote:
>
>> In this case it's probably easiest to map the scala `Map[K, V]` into a
>> `java.util.Map<K, V>` and explicitly set a `MapCoder<K, V>` so you don't
>> have to deal with internal coder inference.
>>
>>
>> On Fri, Jan 19, 2018 at 11:03 AM Neville Li <ne...@gmail.com>
>> wrote:
>>
>>> That happens when you mix beam transforms into scio and defeats the
>>> safety we have in place. Map the values into something beam-serializable
>>> first or rewrite the transform with a scio built-in which takes care of
>>> KvCoder.
>>>
>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> I'm following this example:
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>
>>>> because I'm building something very similar to a group into batches
>>>> functionality. If I don't set the coder manually, this exception arises:
>>>> https://pastebin.com/xxdDMXSf
>>>>
>>>> Thanks!
>>>>
>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com>
>>>> wrote:
>>>>
>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>> KryoAtomicCoder for most Scala types.
>>>>> More details:
>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>
>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> May it be because I’m using
>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>>>> some point in the pipeline
>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>> outputs a SerializableCoder)?
>>>>>>
>>>>>> This is something I've always wondered. How does one specify a coder
>>>>>> for a case class?
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>>
>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone!!
>>>>>>>>
>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>>> and timely processing and after building and testing the project locally I
>>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>>
>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>
>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>
>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>
>>>>>>>> Can anyone help me, please?
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Carlos Alonso <ca...@mrcalonso.com>.
You mean replacing the Map[String, String] from the case class into a
java.util.Map<String, String>? And then, how could I set that
MapCoder<String, String> for that bit?

Sorry if those questions are too newbie, but this is my first experience
with Beam...

Thanks!

On Fri, Jan 19, 2018 at 5:19 PM Neville Li <ne...@gmail.com> wrote:

> In this case it's probably easiest to map the scala `Map[K, V]` into a
> `java.util.Map<K, V>` and explicitly set a `MapCoder<K, V>` so you don't
> have to deal with internal coder inference.
>
>
> On Fri, Jan 19, 2018 at 11:03 AM Neville Li <ne...@gmail.com> wrote:
>
>> That happens when you mix beam transforms into scio and defeats the
>> safety we have in place. Map the values into something beam-serializable
>> first or rewrite the transform with a scio built-in which takes care of
>> KvCoder.
>>
>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> I'm following this example:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>
>>> because I'm building something very similar to a group into batches
>>> functionality. If I don't set the coder manually, this exception arises:
>>> https://pastebin.com/xxdDMXSf
>>>
>>> Thanks!
>>>
>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com>
>>> wrote:
>>>
>>>> You shouldn't manually set coder in most cases. It defaults to
>>>> KryoAtomicCoder for most Scala types.
>>>> More details:
>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>
>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> May it be because I’m using
>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>>> some point in the pipeline
>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>> outputs a SerializableCoder)?
>>>>>
>>>>> This is something I've always wondered. How does one specify a coder
>>>>> for a case class?
>>>>>
>>>>> Regards
>>>>>
>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone!!
>>>>>>>
>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>> and timely processing and after building and testing the project locally I
>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>
>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>
>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>
>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>
>>>>>>> Can anyone help me, please?
>>>>>>> Thanks!
>>>>>>>
>>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Neville Li <ne...@gmail.com>.
In this case it's probably easiest to map the scala `Map[K, V]` into a
`java.util.Map<K, V>` and explicitly set a `MapCoder<K, V>` so you don't
have to deal with internal coder inference.

On Fri, Jan 19, 2018 at 11:03 AM Neville Li <ne...@gmail.com> wrote:

> That happens when you mix beam transforms into scio and defeats the safety
> we have in place. Map the values into something beam-serializable first or
> rewrite the transform with a scio built-in which takes care of KvCoder.
>
> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>
>> I'm following this example:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>
>> because I'm building something very similar to a group into batches
>> functionality. If I don't set the coder manually, this exception arises:
>> https://pastebin.com/xxdDMXSf
>>
>> Thanks!
>>
>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com> wrote:
>>
>>> You shouldn't manually set coder in most cases. It defaults to
>>> KryoAtomicCoder for most Scala types.
>>> More details:
>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>
>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> May it be because I’m using
>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>> some point in the pipeline
>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>> outputs a SerializableCoder)?
>>>>
>>>> This is something I've always wondered. How does one specify a coder
>>>> for a case class?
>>>>
>>>> Regards
>>>>
>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com> wrote:
>>>>
>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>
>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone!!
>>>>>>
>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>> and timely processing and after building and testing the project locally I
>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>
>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>
>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>>> (content: String, attrs: Map[String, String])
>>>>>>
>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>
>>>>>> Can anyone help me, please?
>>>>>> Thanks!
>>>>>>
>>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Neville Li <ne...@gmail.com>.
That happens when you mix beam transforms into scio and defeats the safety
we have in place. Map the values into something beam-serializable first or
rewrite the transform with a scio built-in which takes care of KvCoder.

On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> I'm following this example:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>
> because I'm building something very similar to a group into batches
> functionality. If I don't set the coder manually, this exception arises:
> https://pastebin.com/xxdDMXSf
>
> Thanks!
>
> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com> wrote:
>
>> You shouldn't manually set coder in most cases. It defaults to
>> KryoAtomicCoder for most Scala types.
>> More details:
>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>
>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> May it be because I’m using
>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>> some point in the pipeline
>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>> outputs a SerializableCoder)?
>>>
>>> This is something I've always wondered. How does one specify a coder for
>>> a case class?
>>>
>>> Regards
>>>
>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com> wrote:
>>>
>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>> issue with ideally a snippet that can reproduce the problem?
>>>>
>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> Hi everyone!!
>>>>>
>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>> and timely processing and after building and testing the project locally I
>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>
>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>
>>>>> The item I'm trying to serialize is a KV[String,
>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined as
>>>>> (content: String, attrs: Map[String, String])
>>>>>
>>>>> The underlying clause is java.io.NotSerializableException:
>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>
>>>>> Can anyone help me, please?
>>>>> Thanks!
>>>>>
>>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Carlos Alonso <ca...@mrcalonso.com>.
I'm following this example:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60

because I'm building something very similar to a group into batches
functionality. If I don't set the coder manually, this exception arises:
https://pastebin.com/xxdDMXSf

Thanks!

On Fri, Jan 19, 2018 at 4:35 PM Neville Li <ne...@gmail.com> wrote:

> You shouldn't manually set coder in most cases. It defaults to
> KryoAtomicCoder for most Scala types.
> More details:
> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>
> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>
>> May it be because I’m using
>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>> some point in the pipeline
>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>> outputs a SerializableCoder)?
>>
>> This is something I've always wondered. How does one specify a coder for
>> a case class?
>>
>> Regards
>>
>> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com> wrote:
>>
>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>> issue with ideally a snippet that can reproduce the problem?
>>>
>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi everyone!!
>>>>
>>>> I'm building a pipeline to store items from a Google PubSub
>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>> and timely processing and after building and testing the project locally I
>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>
>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>
>>>> The item I'm trying to serialize is a KV[String, MessageWithAttributes]
>>>> and MessageWithAttributes is a case class defined as (content: String,
>>>> attrs: Map[String, String])
>>>>
>>>> The underlying clause is java.io.NotSerializableException:
>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>> honest, I don't know what does it mean and how to fix it.
>>>>
>>>> Can anyone help me, please?
>>>> Thanks!
>>>>
>>>

Re: Trying to understand Unable to encode element exceptions

Posted by Neville Li <ne...@gmail.com>.
You shouldn't manually set coder in most cases. It defaults to
KryoAtomicCoder for most Scala types.
More details:
https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders

On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> May it be because I’m using
> .setCoder(KvCoder.of(StringUtf8Coder.of(),
> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
> some point in the pipeline
> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
> outputs a SerializableCoder)?
>
> This is something I've always wondered. How does one specify a coder for a
> case class?
>
> Regards
>
> On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com> wrote:
>
>> Not sure why it falls back to SerializableCoder. Can you file an GH issue
>> with ideally a snippet that can reproduce the problem?
>>
>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>>
>>> Hi everyone!!
>>>
>>> I'm building a pipeline to store items from a Google PubSub subscription
>>> into GCS buckets. In order to do it I'm using both stateful and timely
>>> processing and after building and testing the project locally I tried to
>>> run it on Google Dataflow and I started getting those errors.
>>>
>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>
>>> The item I'm trying to serialize is a KV[String, MessageWithAttributes]
>>> and MessageWithAttributes is a case class defined as (content: String,
>>> attrs: Map[String, String])
>>>
>>> The underlying clause is java.io.NotSerializableException:
>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>> well) which may suggest that the issue is on serializing the Map, but to be
>>> honest, I don't know what does it mean and how to fix it.
>>>
>>> Can anyone help me, please?
>>> Thanks!
>>>
>>

Re: Trying to understand Unable to encode element exceptions

Posted by Carlos Alonso <ca...@mrcalonso.com>.
May it be because I’m using
.setCoder(KvCoder.of(StringUtf8Coder.of(),
CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
some point in the pipeline
(CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
outputs a SerializableCoder)?

This is something I've always wondered. How does one specify a coder for a
case class?

Regards

On Fri, 19 Jan 2018 at 15:51, Neville Li <ne...@gmail.com> wrote:

> Not sure why it falls back to SerializableCoder. Can you file an GH issue
> with ideally a snippet that can reproduce the problem?
>
> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>
>> Hi everyone!!
>>
>> I'm building a pipeline to store items from a Google PubSub subscription
>> into GCS buckets. In order to do it I'm using both stateful and timely
>> processing and after building and testing the project locally I tried to
>> run it on Google Dataflow and I started getting those errors.
>>
>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>
>> The item I'm trying to serialize is a KV[String, MessageWithAttributes]
>> and MessageWithAttributes is a case class defined as (content: String,
>> attrs: Map[String, String])
>>
>> The underlying clause is java.io.NotSerializableException:
>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>> well) which may suggest that the issue is on serializing the Map, but to be
>> honest, I don't know what does it mean and how to fix it.
>>
>> Can anyone help me, please?
>> Thanks!
>>
>

Re: Trying to understand Unable to encode element exceptions

Posted by Neville Li <ne...@gmail.com>.
Not sure why it falls back to SerializableCoder. Can you file an GH issue
with ideally a snippet that can reproduce the problem?

On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> Hi everyone!!
>
> I'm building a pipeline to store items from a Google PubSub subscription
> into GCS buckets. In order to do it I'm using both stateful and timely
> processing and after building and testing the project locally I tried to
> run it on Google Dataflow and I started getting those errors.
>
> The full stack trace is here: https://pastebin.com/LqecPhsq
>
> The item I'm trying to serialize is a KV[String, MessageWithAttributes]
> and MessageWithAttributes is a case class defined as (content: String,
> attrs: Map[String, String])
>
> The underlying clause is java.io.NotSerializableException:
> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
> well) which may suggest that the issue is on serializing the Map, but to be
> honest, I don't know what does it mean and how to fix it.
>
> Can anyone help me, please?
> Thanks!
>