You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by 刘见康 <jk...@gmail.com> on 2016/03/17 06:57:31 UTC

Unable to serialize exception running KafkaWindowedWordCountExample

Hi guys,

Failed to run KafkaWindowedWordCountExample with Unable to serialize
exception, the stack exception as below:

16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
PipelineOptions.filesToStage was not specified. Defaulting to files from
the classpath: will stage 160 files. Enable logging at DEBUG level to see
which files will be staged.
16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
the required Streaming Environment.
16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
metadata from broker localhost:9092 in try 0/3
16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
read the following topics (with number of partitions):
Exception in thread "main" java.lang.IllegalArgumentException: unable to
serialize
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
at
com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
at
com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
at
org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException:
com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
... 10 more

I found there is a similar issue in flink-dataflow
https://github.com/dataArtisans/flink-dataflow/issues/8.

Do you have an idea about this error?

Thanks
Jiankang

Re: Unable to serialize exception running KafkaWindowedWordCountExample

Posted by Maximilian Michels <mx...@apache.org>.
Hi Jiankang,

Thanks for reporting again. I'm sorry that you ran into another
problem. This example had been working but it has some small problems
with the new code base we just migrated to.

I've fixed and tested the example and would invite you to try again.

Thanks,
Max

On Thu, Mar 17, 2016 at 1:25 PM, 刘见康 <jk...@gmail.com> wrote:
> @Max:
> Thanks for your quick fix, this serializable exception has been solved.
> However, it reported another one:
> 16/03/17 20:14:23 INFO flink.FlinkPipelineRunner:
> PipelineOptions.filesToStage was not specified. Defaulting to files from
> the classpath: will stage 158 files. Enable logging at DEBUG level to see
> which files will be staged.
> 16/03/17 20:14:23 INFO flink.FlinkPipelineExecutionEnvironment: Creating
> the required Streaming Environment.
> 16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
> metadata from broker localhost:9092 in try 0/3
> 16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
> read the following topics (with number of partitions):
> Exception in thread "main" java.lang.RuntimeException: Flink Sources are
> supported only when running with the FlinkPipelineRunner.
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource.getDefaultOutputCoder(UnboundedFlinkSource.java:71)
> at
> com.google.cloud.dataflow.sdk.io.Read$Unbounded.getDefaultOutputCoder(Read.java:230)
> at
> com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:294)
> at
> com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:309)
> at
> com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:167)
> at
> com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)
> at
> com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137)
> at
> com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88)
> at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331)
> at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
> at
> com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
> at
> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:127)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>
> Dive into the UnboundedFlinkSource class, it just like a simple class imply
> the UnboundedSource interface with throw RuntimeException.
> I just wonder if this Kafka Streaming example is runnable?
>
> Thanks
> Jiankang
>
>
> On Thu, Mar 17, 2016 at 7:35 PM, Maximilian Michels <mx...@apache.org> wrote:
>
>> @Dan: You're right that the PipelineOptions shouldn't be cached like
>> this. In this particular wrapper, it was not even necessary.
>>
>> @Jiankang: I've pushed a fix to the repository with a few
>> improvements. Could you please try again? You will have to recompile.
>>
>> Thanks,
>> Max
>>
>> On Thu, Mar 17, 2016 at 8:44 AM, Dan Halperin <dh...@google.com> wrote:
>> > +Max for the Flink Runner, and +Luke who wrote most of the initial code
>> > around PipelineOptions.
>> >
>> > The UnboundedFlinkSource is caching the `PipelineOptions` object, here:
>> >
>> https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36
>> >
>> > I think this is a mismatch with how we intended them to be used. For
>> > example, the PipelineOptions may be changed by a Runner between graph
>> > construction time (when the UnboundedFlinkSource is created) and actual
>> > pipeline execution time. This is partially why, for example,
>> PipelineOptions
>> > are provided by the Runner as an argument to functions like
>> > DoFn.startBundle, .processElement, and .finishBundle.
>> >
>> > PipelineOptions itself does not extend Serializable, and per the
>> > PipelineOptions documentation it looks like we intend for it to be
>> > serialized through Jackson rather than through Java serialization. I bet
>> the
>> > Flink runner does this, and we probably just need to remove this cached
>> > PipelineOptions from the unbounded source.
>> >
>> > I'll let Luke and Max correct me on any or all of the above :)
>> >
>> > Thanks,
>> > Dan
>> >
>> > On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <jk...@gmail.com> wrote:
>> >>
>> >> Hi guys,
>> >>
>> >> Failed to run KafkaWindowedWordCountExample with Unable to serialize
>> >> exception, the stack exception as below:
>> >>
>> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
>> >> PipelineOptions.filesToStage was not specified. Defaulting to files from
>> >> the classpath: will stage 160 files. Enable logging at DEBUG level to
>> see
>> >> which files will be staged.
>> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
>> >> the required Streaming Environment.
>> >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
>> >> metadata from broker localhost:9092 in try 0/3
>> >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going
>> to
>> >> read the following topics (with number of partitions):
>> >> Exception in thread "main" java.lang.IllegalArgumentException: unable to
>> >> serialize
>> >>
>> >>
>> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
>> >> at
>> >>
>> >>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
>> >> at
>> >>
>> >>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
>> >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
>> >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
>> >> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
>> >> at
>> >>
>> >>
>> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> at
>> >>
>> >>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >> at
>> >>
>> >>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> at java.lang.reflect.Method.invoke(Method.java:497)
>> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> >> Caused by: java.io.NotSerializableException:
>> >> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
>> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> >> at
>> >>
>> >>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >> at
>> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >> at
>> >>
>> >>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >> at
>> >>
>> >>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >> at
>> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >> at
>> >>
>> >>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> >> at
>> >>
>> >>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
>> >> ... 10 more
>> >>
>> >> I found there is a similar issue in flink-dataflow
>> >> https://github.com/dataArtisans/flink-dataflow/issues/8.
>> >>
>> >> Do you have an idea about this error?
>> >>
>> >> Thanks
>> >> Jiankang
>> >
>> >
>>

Re: Unable to serialize exception running KafkaWindowedWordCountExample

Posted by 刘见康 <jk...@gmail.com>.
@Max:
Thanks for your quick fix, this serializable exception has been solved.
However, it reported another one:
16/03/17 20:14:23 INFO flink.FlinkPipelineRunner:
PipelineOptions.filesToStage was not specified. Defaulting to files from
the classpath: will stage 158 files. Enable logging at DEBUG level to see
which files will be staged.
16/03/17 20:14:23 INFO flink.FlinkPipelineExecutionEnvironment: Creating
the required Streaming Environment.
16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
metadata from broker localhost:9092 in try 0/3
16/03/17 20:14:23 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
read the following topics (with number of partitions):
Exception in thread "main" java.lang.RuntimeException: Flink Sources are
supported only when running with the FlinkPipelineRunner.
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource.getDefaultOutputCoder(UnboundedFlinkSource.java:71)
at
com.google.cloud.dataflow.sdk.io.Read$Unbounded.getDefaultOutputCoder(Read.java:230)
at
com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:294)
at
com.google.cloud.dataflow.sdk.transforms.PTransform.getDefaultOutputCoder(PTransform.java:309)
at
com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:167)
at
com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48)
at
com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137)
at
com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at
com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at
org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:127)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

Dive into the UnboundedFlinkSource class, it just like a simple class imply
the UnboundedSource interface with throw RuntimeException.
I just wonder if this Kafka Streaming example is runnable?

Thanks
Jiankang


On Thu, Mar 17, 2016 at 7:35 PM, Maximilian Michels <mx...@apache.org> wrote:

> @Dan: You're right that the PipelineOptions shouldn't be cached like
> this. In this particular wrapper, it was not even necessary.
>
> @Jiankang: I've pushed a fix to the repository with a few
> improvements. Could you please try again? You will have to recompile.
>
> Thanks,
> Max
>
> On Thu, Mar 17, 2016 at 8:44 AM, Dan Halperin <dh...@google.com> wrote:
> > +Max for the Flink Runner, and +Luke who wrote most of the initial code
> > around PipelineOptions.
> >
> > The UnboundedFlinkSource is caching the `PipelineOptions` object, here:
> >
> https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36
> >
> > I think this is a mismatch with how we intended them to be used. For
> > example, the PipelineOptions may be changed by a Runner between graph
> > construction time (when the UnboundedFlinkSource is created) and actual
> > pipeline execution time. This is partially why, for example,
> PipelineOptions
> > are provided by the Runner as an argument to functions like
> > DoFn.startBundle, .processElement, and .finishBundle.
> >
> > PipelineOptions itself does not extend Serializable, and per the
> > PipelineOptions documentation it looks like we intend for it to be
> > serialized through Jackson rather than through Java serialization. I bet
> the
> > Flink runner does this, and we probably just need to remove this cached
> > PipelineOptions from the unbounded source.
> >
> > I'll let Luke and Max correct me on any or all of the above :)
> >
> > Thanks,
> > Dan
> >
> > On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <jk...@gmail.com> wrote:
> >>
> >> Hi guys,
> >>
> >> Failed to run KafkaWindowedWordCountExample with Unable to serialize
> >> exception, the stack exception as below:
> >>
> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
> >> PipelineOptions.filesToStage was not specified. Defaulting to files from
> >> the classpath: will stage 160 files. Enable logging at DEBUG level to
> see
> >> which files will be staged.
> >> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
> >> the required Streaming Environment.
> >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
> >> metadata from broker localhost:9092 in try 0/3
> >> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going
> to
> >> read the following topics (with number of partitions):
> >> Exception in thread "main" java.lang.IllegalArgumentException: unable to
> >> serialize
> >>
> >>
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
> >> at
> >>
> >>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
> >> at
> >>
> >>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
> >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
> >> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
> >> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
> >> at
> >>
> >>
> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:497)
> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> >> Caused by: java.io.NotSerializableException:
> >> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> >> at
> >>
> >>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >> at
> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >> at
> >>
> >>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >> at
> >>
> >>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >> at
> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >> at
> >>
> >>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >> at
> >>
> >>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
> >> ... 10 more
> >>
> >> I found there is a similar issue in flink-dataflow
> >> https://github.com/dataArtisans/flink-dataflow/issues/8.
> >>
> >> Do you have an idea about this error?
> >>
> >> Thanks
> >> Jiankang
> >
> >
>

Re: Unable to serialize exception running KafkaWindowedWordCountExample

Posted by Maximilian Michels <mx...@apache.org>.
@Dan: You're right that the PipelineOptions shouldn't be cached like
this. In this particular wrapper, it was not even necessary.

@Jiankang: I've pushed a fix to the repository with a few
improvements. Could you please try again? You will have to recompile.

Thanks,
Max

On Thu, Mar 17, 2016 at 8:44 AM, Dan Halperin <dh...@google.com> wrote:
> +Max for the Flink Runner, and +Luke who wrote most of the initial code
> around PipelineOptions.
>
> The UnboundedFlinkSource is caching the `PipelineOptions` object, here:
> https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36
>
> I think this is a mismatch with how we intended them to be used. For
> example, the PipelineOptions may be changed by a Runner between graph
> construction time (when the UnboundedFlinkSource is created) and actual
> pipeline execution time. This is partially why, for example, PipelineOptions
> are provided by the Runner as an argument to functions like
> DoFn.startBundle, .processElement, and .finishBundle.
>
> PipelineOptions itself does not extend Serializable, and per the
> PipelineOptions documentation it looks like we intend for it to be
> serialized through Jackson rather than through Java serialization. I bet the
> Flink runner does this, and we probably just need to remove this cached
> PipelineOptions from the unbounded source.
>
> I'll let Luke and Max correct me on any or all of the above :)
>
> Thanks,
> Dan
>
> On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <jk...@gmail.com> wrote:
>>
>> Hi guys,
>>
>> Failed to run KafkaWindowedWordCountExample with Unable to serialize
>> exception, the stack exception as below:
>>
>> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>> the classpath: will stage 160 files. Enable logging at DEBUG level to see
>> which files will be staged.
>> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
>> the required Streaming Environment.
>> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
>> metadata from broker localhost:9092 in try 0/3
>> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
>> read the following topics (with number of partitions):
>> Exception in thread "main" java.lang.IllegalArgumentException: unable to
>> serialize
>>
>> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
>> at
>>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
>> at
>>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
>> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
>> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
>> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
>> at
>>
>> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> Caused by: java.io.NotSerializableException:
>> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>>
>> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
>> ... 10 more
>>
>> I found there is a similar issue in flink-dataflow
>> https://github.com/dataArtisans/flink-dataflow/issues/8.
>>
>> Do you have an idea about this error?
>>
>> Thanks
>> Jiankang
>
>

Re: Unable to serialize exception running KafkaWindowedWordCountExample

Posted by Dan Halperin <dh...@google.com.INVALID>.
+Max for the Flink Runner, and +Luke who wrote most of the initial code
around PipelineOptions.

The UnboundedFlinkSource is caching the `PipelineOptions` object, here:
https://github.com/apache/incubator-beam/blob/071e4dd67021346b0cab2aafa0900ec7e34c4ef8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java#L36

I think this is a mismatch with how we intended them to be used. For
example, the PipelineOptions may be changed by a Runner between graph
construction time (when the UnboundedFlinkSource is created) and actual
pipeline execution time. This is partially why, for example,
PipelineOptions are provided by the Runner as an argument to functions like
DoFn.startBundle, .processElement, and .finishBundle.

PipelineOptions itself does not extend Serializable, and per the
PipelineOptions documentation it looks like we intend for it to be
serialized through Jackson rather than through Java serialization. I bet
the Flink runner does this, and we probably just need to remove this cached
PipelineOptions from the unbounded source.

I'll let Luke and Max correct me on any or all of the above :)

Thanks,
Dan

On Wed, Mar 16, 2016 at 10:57 PM, 刘见康 <jk...@gmail.com> wrote:

> Hi guys,
>
> Failed to run KafkaWindowedWordCountExample with Unable to serialize
> exception, the stack exception as below:
>
> 16/03/17 13:49:09 INFO flink.FlinkPipelineRunner:
> PipelineOptions.filesToStage was not specified. Defaulting to files from
> the classpath: will stage 160 files. Enable logging at DEBUG level to see
> which files will be staged.
> 16/03/17 13:49:09 INFO flink.FlinkPipelineExecutionEnvironment: Creating
> the required Streaming Environment.
> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumer08: Trying to get topic
> metadata from broker localhost:9092 in try 0/3
> 16/03/17 13:49:09 INFO kafka.FlinkKafkaConsumerBase: Consumer is going to
> read the following topics (with number of partitions):
> Exception in thread "main" java.lang.IllegalArgumentException: unable to
> serialize
>
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource@2d29b4ee
> at
>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54)
> at
>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:84)
> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:194)
> at com.google.cloud.dataflow.sdk.io.Read$Unbounded.<init>(Read.java:189)
> at com.google.cloud.dataflow.sdk.io.Read.from(Read.java:69)
> at
>
> org.apache.beam.runners.flink.examples.streaming.KafkaWindowedWordCountExample.main(KafkaWindowedWordCountExample.java:129)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.io.NotSerializableException:
> com.google.cloud.dataflow.sdk.options.ProxyInvocationHandler
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
>
> com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50)
> ... 10 more
>
> I found there is a similar issue in flink-dataflow
> https://github.com/dataArtisans/flink-dataflow/issues/8.
>
> Do you have an idea about this error?
>
> Thanks
> Jiankang
>