You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/05/09 23:06:47 UTC

DoFn Deserialize issue

Hi Colleagues,Sorry I have been bombarding this forum.Not sure whats going on...Resolved a lot of runtime issues , now get the following exception on my DoFn:
kafkarecords.apply(ParDo.of( new DoFn <String, String>() { 
 @Override public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); } })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt"));

Any idea? I really appreciate your help.Thanks.

 java.lang.IllegalArgumentException: unable to deserialize com.myco.tech.arc.ReadFromKafka2$1@62dae245        at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)        at org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)        at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)        at org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)        at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)

Re: DoFn Deserialize issue

Posted by amir bahmanyari <am...@yahoo.com>.
This is at the bottom of exceptions...Not sure why KafkaIO still needs a GroupId prop to be set.Perhaps this is why the CompLRRecFn  class & the parent class are not deserialziable at the top of the stack.Pls share your idea how to set "group.id" for KafkaIO call.
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
Thanks for your help.

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>; Lukasz Cwik <lc...@google.com> 
 Sent: Monday, May 9, 2016 5:42 PM
 Subject: Re: DoFn Deserialize issue
   
Oh! sorry...different exception:  this time the static class CompLRRecFn is not deserialized! Should it be "public"?
 java.lang.IllegalArgumentException: unable to deserialize com.myco.tech.arc.ReadFromKafka2$CompLRRecFn@62dae245        at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)

Thanks.      From: amir bahmanyari <am...@yahoo.com>
 To: Lukasz Cwik <lc...@google.com>; "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Monday, May 9, 2016 5:38 PM
 Subject: Re: DoFn Deserialize issue
  
Thanks Lukasz,I did the following and still the same exceptions.Thanks for your help.
class ReadFromKafka2 {.....
    static class CompLRRecFn extends DoFn<String, String> { //private static final long serialVersionUID = 1L;
 @Override public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); }   }

     kafkarecords.apply(ParDo.of(new CompLRRecFn())).apply(TextIO.Write.to("/tmp/outputKafka.txt"));
..}

      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, May 9, 2016 5:20 PM
 Subject: Re: DoFn Deserialize issue
  
You can't just mark ReadFromKafka2 implements serializable and expect it to work, all the fields used with ReadFromKafka2 have to be serializable as well.
Instead of using an anonoymous inner class, define a static inner class:class ReadFromKafka2 {  static class MyDoFn extends DoFn<String, String> { @Override
 public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); }  }
...}
you can instantiate it with "new MyDoFn()" 
On Mon, May 9, 2016 at 5:07 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Lukasz,Thanks for your reply.
I changed it to: public class ReadFromKafka2 implements Serializable{
& run it with the same anonymous inner class as below  once, and again with making it explicit as:kafkarecords.apply(ParDo.named("ReadInput").of(...etc
providing the same implementation as below.
Same throw! :(
Any sample code that works for deployment to a Flink cluster pls?Not sure if I have my pom.xml straighten up either..Thanks for your help.      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, May 9, 2016 4:13 PM
 Subject: Re: DoFn Deserialize issue
   
Is ReadFromKafka2 serializable?
Your code snippet uses an anonymous inner class, which means it will attempt to serialize the outer class as well. In this case ReadFromKafka2
On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <am...@yahoo.com> wrote:


Hi Colleagues,Sorry I have been bombarding this forum.Not sure whats going on...Resolved a lot of runtime issues , now get the following exception on my DoFn:
kafkarecords.apply(ParDo.of( new DoFn <String, String>() { 
 @Override public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); } })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt"));

Any idea? I really appreciate your help.Thanks.

 java.lang.IllegalArgumentException: unable to deserialize com.myco.tech.arc.ReadFromKafka2$1@62dae245        at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)        at org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)        at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)        at org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)        at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)




   



   

   

  

Re: DoFn Deserialize issue

Posted by amir bahmanyari <am...@yahoo.com>.
Oh! sorry...different exception:  this time the static class CompLRRecFn is not deserialized! Should it be "public"?
 java.lang.IllegalArgumentException: unable to deserialize com.myco.tech.arc.ReadFromKafka2$CompLRRecFn@62dae245        at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)

Thanks.      From: amir bahmanyari <am...@yahoo.com>
 To: Lukasz Cwik <lc...@google.com>; "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Monday, May 9, 2016 5:38 PM
 Subject: Re: DoFn Deserialize issue
   
Thanks Lukasz,I did the following and still the same exceptions.Thanks for your help.
class ReadFromKafka2 {.....
    static class CompLRRecFn extends DoFn<String, String> { //private static final long serialVersionUID = 1L;
 @Override public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); }   }

     kafkarecords.apply(ParDo.of(new CompLRRecFn())).apply(TextIO.Write.to("/tmp/outputKafka.txt"));
..}

      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, May 9, 2016 5:20 PM
 Subject: Re: DoFn Deserialize issue
  
You can't just mark ReadFromKafka2 implements serializable and expect it to work, all the fields used with ReadFromKafka2 have to be serializable as well.
Instead of using an anonoymous inner class, define a static inner class:class ReadFromKafka2 {  static class MyDoFn extends DoFn<String, String> { @Override
 public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); }  }
...}
you can instantiate it with "new MyDoFn()" 
On Mon, May 9, 2016 at 5:07 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Lukasz,Thanks for your reply.
I changed it to: public class ReadFromKafka2 implements Serializable{
& run it with the same anonymous inner class as below  once, and again with making it explicit as:kafkarecords.apply(ParDo.named("ReadInput").of(...etc
providing the same implementation as below.
Same throw! :(
Any sample code that works for deployment to a Flink cluster pls?Not sure if I have my pom.xml straighten up either..Thanks for your help.      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, May 9, 2016 4:13 PM
 Subject: Re: DoFn Deserialize issue
   
Is ReadFromKafka2 serializable?
Your code snippet uses an anonymous inner class, which means it will attempt to serialize the outer class as well. In this case ReadFromKafka2
On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <am...@yahoo.com> wrote:


Hi Colleagues,Sorry I have been bombarding this forum.Not sure whats going on...Resolved a lot of runtime issues , now get the following exception on my DoFn:
kafkarecords.apply(ParDo.of( new DoFn <String, String>() { 
 @Override public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); } })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt"));

Any idea? I really appreciate your help.Thanks.

 java.lang.IllegalArgumentException: unable to deserialize com.myco.tech.arc.ReadFromKafka2$1@62dae245        at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)        at org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)        at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)        at org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)        at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)




   



   

  

Re: DoFn Deserialize issue

Posted by amir bahmanyari <am...@yahoo.com>.
Thanks Lukasz,I did the following and still the same exceptions.Thanks for your help.
class ReadFromKafka2 {.....
    static class CompLRRecFn extends DoFn<String, String> { //private static final long serialVersionUID = 1L;
 @Override public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); }   }

     kafkarecords.apply(ParDo.of(new CompLRRecFn())).apply(TextIO.Write.to("/tmp/outputKafka.txt"));
..}

      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, May 9, 2016 5:20 PM
 Subject: Re: DoFn Deserialize issue
   
You can't just mark ReadFromKafka2 implements serializable and expect it to work, all the fields used with ReadFromKafka2 have to be serializable as well.
Instead of using an anonoymous inner class, define a static inner class:class ReadFromKafka2 {  static class MyDoFn extends DoFn<String, String> { @Override
 public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); }  }
...}
you can instantiate it with "new MyDoFn()" 
On Mon, May 9, 2016 at 5:07 PM, amir bahmanyari <am...@yahoo.com> wrote:

Hi Lukasz,Thanks for your reply.
I changed it to: public class ReadFromKafka2 implements Serializable{
& run it with the same anonymous inner class as below  once, and again with making it explicit as:kafkarecords.apply(ParDo.named("ReadInput").of(...etc
providing the same implementation as below.
Same throw! :(
Any sample code that works for deployment to a Flink cluster pls?Not sure if I have my pom.xml straighten up either..Thanks for your help.      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, May 9, 2016 4:13 PM
 Subject: Re: DoFn Deserialize issue
   
Is ReadFromKafka2 serializable?
Your code snippet uses an anonymous inner class, which means it will attempt to serialize the outer class as well. In this case ReadFromKafka2
On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <am...@yahoo.com> wrote:


Hi Colleagues,Sorry I have been bombarding this forum.Not sure whats going on...Resolved a lot of runtime issues , now get the following exception on my DoFn:
kafkarecords.apply(ParDo.of( new DoFn <String, String>() { 
 @Override public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); } })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt"));

Any idea? I really appreciate your help.Thanks.

 java.lang.IllegalArgumentException: unable to deserialize com.myco.tech.arc.ReadFromKafka2$1@62dae245        at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)        at org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)        at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)        at org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)        at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)




   



  

Re: DoFn Deserialize issue

Posted by Robert Bradshaw <ro...@google.com>.
FYI, the jvm option -Dsun.io.serialization.extendedDebugInfo=true can
be handy for debugging serialization errors like this.

On Mon, May 9, 2016 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
> You can't just mark ReadFromKafka2 implements serializable and expect it to
> work, all the fields used with ReadFromKafka2 have to be serializable as
> well.
>
> Instead of using an anonoymous inner class, define a static inner class:
> class ReadFromKafka2 {
>   static class MyDoFn extends DoFn<String, String> {
> @Override
> public void processElement(ProcessContext ctx) throws Exception {
> System.out.printf("\n from kafka: '%s'  ", ctx.element());
> ctx.output(ctx.element());
> }
>   }
>
> ...
> }
>
> you can instantiate it with "new MyDoFn()"
>
> On Mon, May 9, 2016 at 5:07 PM, amir bahmanyari <am...@yahoo.com> wrote:
>>
>> Hi Lukasz,
>> Thanks for your reply.
>>
>> I changed it to: public class ReadFromKafka2 implements Serializable{
>>
>> & run it with the same anonymous inner class as below  once, and again
>> with making it explicit as:
>> kafkarecords.apply(ParDo.named("ReadInput").of(...etc
>> providing the same implementation as below.
>>
>> Same throw! :(
>>
>> Any sample code that works for deployment to a Flink cluster pls?
>> Not sure if I have my pom.xml straighten up either..
>> Thanks for your help.
>> ________________________________
>> From: Lukasz Cwik <lc...@google.com>
>> To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>> Sent: Monday, May 9, 2016 4:13 PM
>> Subject: Re: DoFn Deserialize issue
>>
>> Is ReadFromKafka2 serializable?
>>
>> Your code snippet uses an anonymous inner class, which means it will
>> attempt to serialize the outer class as well. In this case ReadFromKafka2
>>
>> On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <am...@yahoo.com>
>> wrote:
>>
>>
>> Hi Colleagues,
>> Sorry I have been bombarding this forum.
>> Not sure whats going on...Resolved a lot of runtime issues , now get the
>> following exception on my DoFn:
>>
>> kafkarecords.apply(ParDo.of(
>> new DoFn <String, String>() {
>>
>> @Override
>> public void processElement(ProcessContext ctx) throws Exception {
>> System.out.printf("\n from kafka: '%s'  ", ctx.element());
>> ctx.output(ctx.element());
>> }
>> })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt"));
>>
>> Any idea? I really appreciate your help.
>> Thanks.
>>
>>
>>  java.lang.IllegalArgumentException: unable to deserialize
>> com.myco.tech.arc.ReadFromKafka2$1@62dae245
>>         at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
>>         at
>> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)
>>         at
>> org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)
>>         at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)
>>         at
>> org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)
>>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)
>>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)
>>         at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)
>>
>>
>>
>>
>

Re: DoFn Deserialize issue

Posted by Lukasz Cwik <lc...@google.com>.
You can't just mark ReadFromKafka2 implements serializable and expect it to
work, all the fields used with ReadFromKafka2 have to be serializable as
well.

Instead of using an anonoymous inner class, define a static inner class:
class ReadFromKafka2 {
  static class MyDoFn extends DoFn<String, String> {
@Override
public void processElement(ProcessContext ctx) throws Exception {
System.out.printf("\n from kafka: '%s'  ", ctx.element());
ctx.output(ctx.element());
}
  }

...
}

you can instantiate it with "new MyDoFn()"

On Mon, May 9, 2016 at 5:07 PM, amir bahmanyari <am...@yahoo.com> wrote:

> Hi Lukasz,
> Thanks for your reply.
>
> I changed it to: public class ReadFromKafka2 implements Serializable{
>
> & run it with the same anonymous inner class as below  once, and again
> with making it explicit as:
> kafkarecords.apply(ParDo.named("ReadInput").of(...etc
> providing the same implementation as below.
>
> Same throw! :(
>
> Any sample code that works for deployment to a Flink cluster pls?
> Not sure if I have my pom.xml straighten up either..
> Thanks for your help.
> ------------------------------
> *From:* Lukasz Cwik <lc...@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>
> *Sent:* Monday, May 9, 2016 4:13 PM
> *Subject:* Re: DoFn Deserialize issue
>
> Is ReadFromKafka2 serializable?
>
> Your code snippet uses an anonymous inner class, which means it will
> attempt to serialize the outer class as well. In this case ReadFromKafka2
>
> On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <am...@yahoo.com>
> wrote:
>
>
> Hi Colleagues,
> Sorry I have been bombarding this forum.
> Not sure whats going on...Resolved a lot of runtime issues , now get the
> following exception on my DoFn:
>
> kafkarecords.apply(ParDo.of(
> new DoFn <String, String>() {
>
> @Override
> public void processElement(ProcessContext ctx) throws Exception {
> System.out.printf("\n from kafka: '%s'  ", ctx.element());
> ctx.output(ctx.element());
> }
> })).apply(TextIO.Write.to <http://textio.write.to/>
> ("c:\\temp\\KafkaOut\\outputKafka.txt"));
>
> Any idea? I really appreciate your help.
> Thanks.
>
>
>  java.lang.IllegalArgumentException: unable to deserialize
> com.myco.tech.arc.ReadFromKafka2$1@62dae245
>         at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
>         at
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)
>         at
> org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)
>         at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)
>         at
> org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)
>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)
>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)
>         at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)
>
>
>
>
>

Re: DoFn Deserialize issue

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Lukasz,Thanks for your reply.
I changed it to: public class ReadFromKafka2 implements Serializable{
& run it with the same anonymous inner class as below  once, and again with making it explicit as:kafkarecords.apply(ParDo.named("ReadInput").of(...etc
providing the same implementation as below.
Same throw! :(
Any sample code that works for deployment to a Flink cluster pls?Not sure if I have my pom.xml straighten up either..Thanks for your help.      From: Lukasz Cwik <lc...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Monday, May 9, 2016 4:13 PM
 Subject: Re: DoFn Deserialize issue
   
Is ReadFromKafka2 serializable?
Your code snippet uses an anonymous inner class, which means it will attempt to serialize the outer class as well. In this case ReadFromKafka2
On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <am...@yahoo.com> wrote:


Hi Colleagues,Sorry I have been bombarding this forum.Not sure whats going on...Resolved a lot of runtime issues , now get the following exception on my DoFn:
kafkarecords.apply(ParDo.of( new DoFn <String, String>() { 
 @Override public void processElement(ProcessContext ctx) throws Exception { System.out.printf("\n from kafka: '%s'  ", ctx.element()); ctx.output(ctx.element()); } })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt"));

Any idea? I really appreciate your help.Thanks.

 java.lang.IllegalArgumentException: unable to deserialize com.myco.tech.arc.ReadFromKafka2$1@62dae245        at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)        at org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)        at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)        at org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)        at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)




  

Re: DoFn Deserialize issue

Posted by Lukasz Cwik <lc...@google.com>.
Is ReadFromKafka2 serializable?

Your code snippet uses an anonymous inner class, which means it will
attempt to serialize the outer class as well. In this case ReadFromKafka2

On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <am...@yahoo.com> wrote:

>
> Hi Colleagues,
> Sorry I have been bombarding this forum.
> Not sure whats going on...Resolved a lot of runtime issues , now get the
> following exception on my DoFn:
>
> kafkarecords.apply(ParDo.of(
> new DoFn <String, String>() {
>
> @Override
> public void processElement(ProcessContext ctx) throws Exception {
> System.out.printf("\n from kafka: '%s'  ", ctx.element());
> ctx.output(ctx.element());
> }
> })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt"));
>
> Any idea? I really appreciate your help.
> Thanks.
>
>
>  java.lang.IllegalArgumentException: unable to deserialize
> com.myco.tech.arc.ReadFromKafka2$1@62dae245
>         at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
>         at
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)
>         at
> org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)
>         at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)
>         at
> org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)
>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)
>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)
>         at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)
>
>