You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Borisa Zivkovic <bo...@gmail.com> on 2017/04/28 10:45:11 UTC

AvroCoder + KafkaIO + Flink problem

Hi,

I have this small pipeline that is writing data to Kafka (using AvroCoder)
and then another job is reading the same data from Kafka, doing few
transformations and then writing data back to different Kafka topic
(AvroCoder again).

First pipeline is very simple, read data from a text file, create POJO, use
AvroCoder to write POJO to Kafka.

Second pipeline is also simple, read POJO from Kafka, do few
transformations, create new POJO and write data to Kafka using AvroCoder
again.

When I use direct runner everything is ok.

When I switch to flink runner (small remote flink cluster) I get this
exception in the second pipeline

Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to
test.MyClass

This happens in the the first MapFunction immediately after reading data
from Kafka.

I found about this problem in Flink and how they resolve it but not sure
how to fix this when using Beam?!

https://issues.apache.org/jira/browse/FLINK-1390

test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very
simple POJO.

Not sure how to fix this and still continue using AvroCoder.

My beam version is 0.6.0 - my flink version is 1.2.0

Anyone experienced something similar or has idea how to fix/workaround this?

thanks

Re: AvroCoder + KafkaIO + Flink problem

Posted by Davor Bonaci <da...@apache.org>.
And, thanks to Kenn, a likely workaround is in a pending pull request [1].
This should be fixed shortly at HEAD and be a part of the next release.

Davor

[1] https://github.com/apache/beam/pull/2783


On Sat, Apr 29, 2017 at 1:13 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> There were some newer messages on the issue as well (
> https://issues.apache.org/jira/browse/BEAM-1970). The problem occurs if
> you reuse a Flink cluster for running the same job (or some other job that
> uses the same classes) again. The workaround would be to not reuse a
> cluster for several Jobs. If the jar of a Job has to be in the lib folder
> this also means that a cluster cannot be reused for a new jar since the
> cluster has to be restarted when you have a new job/jar so this workaround
> would have roughly the same “complexity”.
>
> I think we’ll finally get rid of that once we move to a cluster-per-job
> model. :-)
>
> Best,
> Aljoscha
>
> On 29. Apr 2017, at 14:45, Stephan Ewen <se...@apache.org> wrote:
>
> Avro tries to do some magic caching which does not work with multi
> classloader setups.
>
> A common workaround for these types of problems is to drop the relevant
> classes into Flink's "lib" folder and not have them in the job's jar.
>
> We wrote up some help on that: https://ci.apache.org/pr
> ojects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html
>
> Some background: Flink has two different class loading modes:
>
> (1) All in the system class loader, which is used for the one-job-Yarn
> deployments (in the future also for Mesos / Docker / etc)
>
> (2) Dynamic class loading, with one classloader per deployment. Used in
> all setups that follow the pattern "start Flink first, launch job later".
>
>
> On Sat, Apr 29, 2017 at 8:02 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s
>> exactly the same problem.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 28. Apr 2017, at 22:29, Jins George <ji...@aeris.net> wrote:
>>
>> I also faced a similar issue when re-starting a flink job from a save
>> point on an existing cluster . ClassCastException was with
>> KafkaCheckpointMark class . It was due to the different class loaders.  The
>> workaround for me was to run one job per Yarn session.  For restart from
>> savepoint, start a new yarn session and submit.
>>
>> Thanks,
>> Jins George
>>
>> On 04/28/2017 09:34 AM, Frances Perry wrote:
>>
>> I have the same problem and am working around it with SerializableCoder.
>> +1 to a real solution.
>>
>> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < <al...@apache.org>
>> aljoscha@apache.org> wrote:
>>
>>> I think you could. But we should also try finding a solution for this
>>> problem.
>>>
>>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <bo...@gmail.com>
>>> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> this is probably the same problem I am facing.
>>>
>>> I execute multiple pipelines on the same Flink cluster - all launched at
>>> the same time...
>>>
>>> I guess I can try to switch to SerializableCoder and see how that works?
>>>
>>> thanks
>>>
>>>
>>>
>>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < <al...@apache.org>
>>> aljoscha@apache.org> wrote:
>>>
>>>> Hi,
>>>> There is this open issue:
>>>> <https://issues.apache.org/jira/browse/BEAM-1970>
>>>> https://issues.apache.org/jira/browse/BEAM-1970. Could this also be
>>>> what is affecting you? Are you running several pipelines on the same Flink
>>>> cluster, either one after another or at the same time?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic <
>>>> <bo...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I have this small pipeline that is writing data to Kafka (using
>>>> AvroCoder) and then another job is reading the same data from Kafka, doing
>>>> few transformations and then writing data back to different Kafka topic
>>>> (AvroCoder again).
>>>>
>>>> First pipeline is very simple, read data from a text file, create POJO,
>>>> use AvroCoder to write POJO to Kafka.
>>>>
>>>> Second pipeline is also simple, read POJO from Kafka, do few
>>>> transformations, create new POJO and write data to Kafka using AvroCoder
>>>> again.
>>>>
>>>> When I use direct runner everything is ok.
>>>>
>>>> When I switch to flink runner (small remote flink cluster) I get this
>>>> exception in the second pipeline
>>>>
>>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to
>>>> test.MyClass
>>>>
>>>> This happens in the the first MapFunction immediately after reading
>>>> data from Kafka.
>>>>
>>>> I found about this problem in Flink and how they resolve it but not
>>>> sure how to fix this when using Beam?!
>>>>
>>>> <https://issues.apache.org/jira/browse/FLINK-1390>
>>>> https://issues.apache.org/jira/browse/FLINK-1390
>>>>
>>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very
>>>> simple POJO.
>>>>
>>>> Not sure how to fix this and still continue using AvroCoder.
>>>>
>>>> My beam version is 0.6.0 - my flink version is 1.2.0
>>>>
>>>> Anyone experienced something similar or has idea how to fix/workaround
>>>> this?
>>>>
>>>> thanks
>>>>
>>>>
>>>>
>>>
>>
>>
>>
>
>

Re: AvroCoder + KafkaIO + Flink problem

Posted by Aljoscha Krettek <al...@apache.org>.
There were some newer messages on the issue as well (https://issues.apache.org/jira/browse/BEAM-1970 <https://issues.apache.org/jira/browse/BEAM-1970>). The problem occurs if you reuse a Flink cluster for running the same job (or some other job that uses the same classes) again. The workaround would be to not reuse a cluster for several Jobs. If the jar of a Job has to be in the lib folder this also means that a cluster cannot be reused for a new jar since the cluster has to be restarted when you have a new job/jar so this workaround would have roughly the same “complexity”.

I think we’ll finally get rid of that once we move to a cluster-per-job model. :-)

Best,
Aljoscha

> On 29. Apr 2017, at 14:45, Stephan Ewen <se...@apache.org> wrote:
> 
> Avro tries to do some magic caching which does not work with multi classloader setups.
> 
> A common workaround for these types of problems is to drop the relevant classes into Flink's "lib" folder and not have them in the job's jar.
> 
> We wrote up some help on that: https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html <https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html>
> 
> Some background: Flink has two different class loading modes:
> 
> (1) All in the system class loader, which is used for the one-job-Yarn deployments (in the future also for Mesos / Docker / etc)
> 
> (2) Dynamic class loading, with one classloader per deployment. Used in all setups that follow the pattern "start Flink first, launch job later".
> 
> 
> On Sat, Apr 29, 2017 at 8:02 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s exactly the same problem.
> 
> Best,
> Aljoscha
> 
> 
>> On 28. Apr 2017, at 22:29, Jins George <jins.george@aeris.net <ma...@aeris.net>> wrote:
>> 
>> I also faced a similar issue when re-starting a flink job from a save point on an existing cluster . ClassCastException was with   KafkaCheckpointMark class . It was due to the different class loaders.  The workaround for me was to run one job per Yarn session.  For restart from savepoint, start a new yarn session and submit. 
>> 
>> Thanks,
>> Jins George
>> 
>> On 04/28/2017 09:34 AM, Frances Perry wrote:
>>> I have the same problem and am working around it with SerializableCoder. +1 to a real solution.
>>> 
>>> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < <ma...@apache.org>aljoscha@apache.org <ma...@apache.org>> wrote:
>>> I think you could. But we should also try finding a solution for this problem.
>>> 
>>>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <borisha.zivkovic@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi Aljoscha,
>>>> 
>>>> this is probably the same problem I am facing.
>>>> 
>>>> I execute multiple pipelines on the same Flink cluster - all launched at the same time... 
>>>> 
>>>> I guess I can try to switch to SerializableCoder and see how that works?
>>>> 
>>>> thanks
>>>> 
>>>> 
>>>> 
>>>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < <ma...@apache.org>aljoscha@apache.org <ma...@apache.org>> wrote:
>>>> Hi,
>>>> There is this open issue:  <https://issues.apache.org/jira/browse/BEAM-1970>https://issues.apache.o <https://issues.apache.o/>rg/jira/browse/BEAM-1970. Could this also be what is affecting you? Are you running several pipelines on the same Flink cluster, either one after another or at the same time?
>>>> 
>>>> Best,
>>>> Aljoscha
>>>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic < <ma...@gmail.com>borisha.zivkovic@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I have this small pipeline that is writing data to Kafka (using AvroCoder) and then another job is reading the same data from Kafka, doing few transformations and then writing data back to different Kafka topic (AvroCoder again).
>>>>> 
>>>>> First pipeline is very simple, read data from a text file, create POJO, use AvroCoder to write POJO to Kafka.
>>>>> 
>>>>> Second pipeline is also simple, read POJO from Kafka, do few transformations, create new POJO and write data to Kafka using AvroCoder again.
>>>>> 
>>>>> When I use direct runner everything is ok.
>>>>> 
>>>>> When I switch to flink runner (small remote flink cluster) I get this exception in the second pipeline
>>>>> 
>>>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to test.MyClass
>>>>> 
>>>>> This happens in the the first MapFunction immediately after reading data from Kafka.
>>>>> 
>>>>> I found about this problem in Flink and how they resolve it but not sure how to fix this when using Beam?!
>>>>> 
>>>>>  <https://issues.apache.org/jira/browse/FLINK-1390>https://issues.apache.org/jira <https://issues.apache.org/jira>/browse/FLINK-1390
>>>>> 
>>>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very simple POJO.
>>>>> 
>>>>> Not sure how to fix this and still continue using AvroCoder.
>>>>> 
>>>>> My beam version is 0.6.0 - my flink version is 1.2.0
>>>>> 
>>>>> Anyone experienced something similar or has idea how to fix/workaround this?
>>>>> 
>>>>> thanks
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 
> 


Re: AvroCoder + KafkaIO + Flink problem

Posted by Stephan Ewen <se...@apache.org>.
Avro tries to do some magic caching which does not work with multi
classloader setups.

A common workaround for these types of problems is to drop the relevant
classes into Flink's "lib" folder and not have them in the job's jar.

We wrote up some help on that:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html

Some background: Flink has two different class loading modes:

(1) All in the system class loader, which is used for the one-job-Yarn
deployments (in the future also for Mesos / Docker / etc)

(2) Dynamic class loading, with one classloader per deployment. Used in all
setups that follow the pattern "start Flink first, launch job later".


On Sat, Apr 29, 2017 at 8:02 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s exactly
> the same problem.
>
> Best,
> Aljoscha
>
>
> On 28. Apr 2017, at 22:29, Jins George <ji...@aeris.net> wrote:
>
> I also faced a similar issue when re-starting a flink job from a save
> point on an existing cluster . ClassCastException was with
> KafkaCheckpointMark class . It was due to the different class loaders.  The
> workaround for me was to run one job per Yarn session.  For restart from
> savepoint, start a new yarn session and submit.
>
> Thanks,
> Jins George
>
> On 04/28/2017 09:34 AM, Frances Perry wrote:
>
> I have the same problem and am working around it with SerializableCoder.
> +1 to a real solution.
>
> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < <al...@apache.org>
> aljoscha@apache.org> wrote:
>
>> I think you could. But we should also try finding a solution for this
>> problem.
>>
>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <bo...@gmail.com>
>> wrote:
>>
>> Hi Aljoscha,
>>
>> this is probably the same problem I am facing.
>>
>> I execute multiple pipelines on the same Flink cluster - all launched at
>> the same time...
>>
>> I guess I can try to switch to SerializableCoder and see how that works?
>>
>> thanks
>>
>>
>>
>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < <al...@apache.org>
>> aljoscha@apache.org> wrote:
>>
>>> Hi,
>>> There is this open issue:
>>> <https://issues.apache.org/jira/browse/BEAM-1970>https://issues.apache.org/jira/browse/BEAM-1970.
>>> Could this also be what is affecting you? Are you running several pipelines
>>> on the same Flink cluster, either one after another or at the same time?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic <
>>> <bo...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I have this small pipeline that is writing data to Kafka (using
>>> AvroCoder) and then another job is reading the same data from Kafka, doing
>>> few transformations and then writing data back to different Kafka topic
>>> (AvroCoder again).
>>>
>>> First pipeline is very simple, read data from a text file, create POJO,
>>> use AvroCoder to write POJO to Kafka.
>>>
>>> Second pipeline is also simple, read POJO from Kafka, do few
>>> transformations, create new POJO and write data to Kafka using AvroCoder
>>> again.
>>>
>>> When I use direct runner everything is ok.
>>>
>>> When I switch to flink runner (small remote flink cluster) I get this
>>> exception in the second pipeline
>>>
>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to
>>> test.MyClass
>>>
>>> This happens in the the first MapFunction immediately after reading data
>>> from Kafka.
>>>
>>> I found about this problem in Flink and how they resolve it but not sure
>>> how to fix this when using Beam?!
>>>
>>> <https://issues.apache.org/jira/browse/FLINK-1390>
>>> https://issues.apache.org/jira/browse/FLINK-1390
>>>
>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very
>>> simple POJO.
>>>
>>> Not sure how to fix this and still continue using AvroCoder.
>>>
>>> My beam version is 0.6.0 - my flink version is 1.2.0
>>>
>>> Anyone experienced something similar or has idea how to fix/workaround
>>> this?
>>>
>>> thanks
>>>
>>>
>>>
>>
>
>
>

Re: AvroCoder + KafkaIO + Flink problem

Posted by Aljoscha Krettek <al...@apache.org>.
Yep, KafkaCheckpointMark also uses AvroCoder, so I’m guessing it’s exactly the same problem.

Best,
Aljoscha

> On 28. Apr 2017, at 22:29, Jins George <ji...@aeris.net> wrote:
> 
> I also faced a similar issue when re-starting a flink job from a save point on an existing cluster . ClassCastException was with   KafkaCheckpointMark class . It was due to the different class loaders.  The workaround for me was to run one job per Yarn session.  For restart from savepoint, start a new yarn session and submit. 
> 
> Thanks,
> Jins George
> 
> On 04/28/2017 09:34 AM, Frances Perry wrote:
>> I have the same problem and am working around it with SerializableCoder. +1 to a real solution.
>> 
>> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek < <ma...@apache.org>aljoscha@apache.org <ma...@apache.org>> wrote:
>> I think you could. But we should also try finding a solution for this problem.
>> 
>>> On 28. Apr 2017, at 17:31, Borisa Zivkovic <borisha.zivkovic@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> this is probably the same problem I am facing.
>>> 
>>> I execute multiple pipelines on the same Flink cluster - all launched at the same time... 
>>> 
>>> I guess I can try to switch to SerializableCoder and see how that works?
>>> 
>>> thanks
>>> 
>>> 
>>> 
>>> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek < <ma...@apache.org>aljoscha@apache.org <ma...@apache.org>> wrote:
>>> Hi,
>>> There is this open issue:  <https://issues.apache.org/jira/browse/BEAM-1970>https://issues.apache.o <https://issues.apache.o/>rg/jira/browse/BEAM-1970. Could this also be what is affecting you? Are you running several pipelines on the same Flink cluster, either one after another or at the same time?
>>> 
>>> Best,
>>> Aljoscha
>>>> On 28. Apr 2017, at 12:45, Borisa Zivkovic < <ma...@gmail.com>borisha.zivkovic@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I have this small pipeline that is writing data to Kafka (using AvroCoder) and then another job is reading the same data from Kafka, doing few transformations and then writing data back to different Kafka topic (AvroCoder again).
>>>> 
>>>> First pipeline is very simple, read data from a text file, create POJO, use AvroCoder to write POJO to Kafka.
>>>> 
>>>> Second pipeline is also simple, read POJO from Kafka, do few transformations, create new POJO and write data to Kafka using AvroCoder again.
>>>> 
>>>> When I use direct runner everything is ok.
>>>> 
>>>> When I switch to flink runner (small remote flink cluster) I get this exception in the second pipeline
>>>> 
>>>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to test.MyClass
>>>> 
>>>> This happens in the the first MapFunction immediately after reading data from Kafka.
>>>> 
>>>> I found about this problem in Flink and how they resolve it but not sure how to fix this when using Beam?!
>>>> 
>>>>  <https://issues.apache.org/jira/browse/FLINK-1390>https://issues.apache.org/jira <https://issues.apache.org/jira>/browse/FLINK-1390
>>>> 
>>>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very simple POJO.
>>>> 
>>>> Not sure how to fix this and still continue using AvroCoder.
>>>> 
>>>> My beam version is 0.6.0 - my flink version is 1.2.0
>>>> 
>>>> Anyone experienced something similar or has idea how to fix/workaround this?
>>>> 
>>>> thanks
>>>> 
>>> 
>> 
>> 
> 


Re: AvroCoder + KafkaIO + Flink problem

Posted by Jins George <ji...@aeris.net>.
I also faced a similar issue when re-starting a flink job from a save 
point on an existing cluster . ClassCastException was with 
KafkaCheckpointMark class . It was due to the different class loaders.  
The workaround for me was to run one job per Yarn session.  For restart 
from savepoint, start a new yarn session and submit.

Thanks,
Jins George

On 04/28/2017 09:34 AM, Frances Perry wrote:
> I have the same problem and am working around it with 
> SerializableCoder. +1 to a real solution.
>
> On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek <aljoscha@apache.org 
> <ma...@apache.org>> wrote:
>
>     I think you could. But we should also try finding a solution for
>     this problem.
>
>>     On 28. Apr 2017, at 17:31, Borisa Zivkovic
>>     <borisha.zivkovic@gmail.com <ma...@gmail.com>>
>>     wrote:
>>
>>     Hi Aljoscha,
>>
>>     this is probably the same problem I am facing.
>>
>>     I execute multiple pipelines on the same Flink cluster - all
>>     launched at the same time...
>>
>>     I guess I can try to switch to SerializableCoder and see how that
>>     works?
>>
>>     thanks
>>
>>
>>
>>     On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek
>>     <aljoscha@apache.org <ma...@apache.org>> wrote:
>>
>>         Hi,
>>         There is this open issue:
>>         https://issues.apache.org/jira/browse/BEAM-1970
>>         <https://issues.apache.org/jira/browse/BEAM-1970>. Could this
>>         also be what is affecting you? Are you running several
>>         pipelines on the same Flink cluster, either one after another
>>         or at the same time?
>>
>>         Best,
>>         Aljoscha
>>>         On 28. Apr 2017, at 12:45, Borisa Zivkovic
>>>         <borisha.zivkovic@gmail.com
>>>         <ma...@gmail.com>> wrote:
>>>
>>>         Hi,
>>>
>>>         I have this small pipeline that is writing data to Kafka
>>>         (using AvroCoder) and then another job is reading the same
>>>         data from Kafka, doing few transformations and then writing
>>>         data back to different Kafka topic (AvroCoder again).
>>>
>>>         First pipeline is very simple, read data from a text file,
>>>         create POJO, use AvroCoder to write POJO to Kafka.
>>>
>>>         Second pipeline is also simple, read POJO from Kafka, do few
>>>         transformations, create new POJO and write data to Kafka
>>>         using AvroCoder again.
>>>
>>>         When I use direct runner everything is ok.
>>>
>>>         When I switch to flink runner (small remote flink cluster) I
>>>         get this exception in the second pipeline
>>>
>>>         Caused by: java.lang.ClassCastException: test.MyClass cannot
>>>         be cast to test.MyClass
>>>
>>>         This happens in the the first MapFunction immediately after
>>>         reading data from Kafka.
>>>
>>>         I found about this problem in Flink and how they resolve it
>>>         but not sure how to fix this when using Beam?!
>>>
>>>         https://issues.apache.org/jira/browse/FLINK-1390
>>>         <https://issues.apache.org/jira/browse/FLINK-1390>
>>>
>>>         test.MyClass has annotation @DefaultCoder(AvroCoder.class)
>>>         and is very simple POJO.
>>>
>>>         Not sure how to fix this and still continue using AvroCoder.
>>>
>>>         My beam version is 0.6.0 - my flink version is 1.2.0
>>>
>>>         Anyone experienced something similar or has idea how to
>>>         fix/workaround this?
>>>
>>>         thanks
>>>
>>
>
>


Re: AvroCoder + KafkaIO + Flink problem

Posted by Frances Perry <fr...@apache.org>.
I have the same problem and am working around it with SerializableCoder. +1
to a real solution.

On Fri, Apr 28, 2017 at 8:46 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> I think you could. But we should also try finding a solution for this
> problem.
>
> On 28. Apr 2017, at 17:31, Borisa Zivkovic <bo...@gmail.com>
> wrote:
>
> Hi Aljoscha,
>
> this is probably the same problem I am facing.
>
> I execute multiple pipelines on the same Flink cluster - all launched at
> the same time...
>
> I guess I can try to switch to SerializableCoder and see how that works?
>
> thanks
>
>
>
> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek <al...@apache.org> wrote:
>
>> Hi,
>> There is this open issue: https://issues.apache.org/jira/browse/BEAM-1970.
>> Could this also be what is affecting you? Are you running several pipelines
>> on the same Flink cluster, either one after another or at the same time?
>>
>> Best,
>> Aljoscha
>>
>> On 28. Apr 2017, at 12:45, Borisa Zivkovic <bo...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I have this small pipeline that is writing data to Kafka (using
>> AvroCoder) and then another job is reading the same data from Kafka, doing
>> few transformations and then writing data back to different Kafka topic
>> (AvroCoder again).
>>
>> First pipeline is very simple, read data from a text file, create POJO,
>> use AvroCoder to write POJO to Kafka.
>>
>> Second pipeline is also simple, read POJO from Kafka, do few
>> transformations, create new POJO and write data to Kafka using AvroCoder
>> again.
>>
>> When I use direct runner everything is ok.
>>
>> When I switch to flink runner (small remote flink cluster) I get this
>> exception in the second pipeline
>>
>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to
>> test.MyClass
>>
>> This happens in the the first MapFunction immediately after reading data
>> from Kafka.
>>
>> I found about this problem in Flink and how they resolve it but not sure
>> how to fix this when using Beam?!
>>
>> https://issues.apache.org/jira/browse/FLINK-1390
>>
>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very
>> simple POJO.
>>
>> Not sure how to fix this and still continue using AvroCoder.
>>
>> My beam version is 0.6.0 - my flink version is 1.2.0
>>
>> Anyone experienced something similar or has idea how to fix/workaround
>> this?
>>
>> thanks
>>
>>
>>
>

Re: AvroCoder + KafkaIO + Flink problem

Posted by Aljoscha Krettek <al...@apache.org>.
I think you could. But we should also try finding a solution for this problem.

> On 28. Apr 2017, at 17:31, Borisa Zivkovic <bo...@gmail.com> wrote:
> 
> Hi Aljoscha,
> 
> this is probably the same problem I am facing.
> 
> I execute multiple pipelines on the same Flink cluster - all launched at the same time... 
> 
> I guess I can try to switch to SerializableCoder and see how that works?
> 
> thanks
> 
> 
> 
> On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> There is this open issue: https://issues.apache.org/jira/browse/BEAM-1970 <https://issues.apache.org/jira/browse/BEAM-1970>. Could this also be what is affecting you? Are you running several pipelines on the same Flink cluster, either one after another or at the same time?
> 
> Best,
> Aljoscha
>> On 28. Apr 2017, at 12:45, Borisa Zivkovic <borisha.zivkovic@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I have this small pipeline that is writing data to Kafka (using AvroCoder) and then another job is reading the same data from Kafka, doing few transformations and then writing data back to different Kafka topic (AvroCoder again).
>> 
>> First pipeline is very simple, read data from a text file, create POJO, use AvroCoder to write POJO to Kafka.
>> 
>> Second pipeline is also simple, read POJO from Kafka, do few transformations, create new POJO and write data to Kafka using AvroCoder again.
>> 
>> When I use direct runner everything is ok.
>> 
>> When I switch to flink runner (small remote flink cluster) I get this exception in the second pipeline
>> 
>> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to test.MyClass
>> 
>> This happens in the the first MapFunction immediately after reading data from Kafka.
>> 
>> I found about this problem in Flink and how they resolve it but not sure how to fix this when using Beam?!
>> 
>> https://issues.apache.org/jira/browse/FLINK-1390 <https://issues.apache.org/jira/browse/FLINK-1390>
>> 
>> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very simple POJO.
>> 
>> Not sure how to fix this and still continue using AvroCoder.
>> 
>> My beam version is 0.6.0 - my flink version is 1.2.0
>> 
>> Anyone experienced something similar or has idea how to fix/workaround this?
>> 
>> thanks
>> 
> 


Re: AvroCoder + KafkaIO + Flink problem

Posted by Borisa Zivkovic <bo...@gmail.com>.
Hi Aljoscha,

this is probably the same problem I am facing.

I execute multiple pipelines on the same Flink cluster - all launched at
the same time...

I guess I can try to switch to SerializableCoder and see how that works?

thanks



On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek <al...@apache.org> wrote:

> Hi,
> There is this open issue: https://issues.apache.org/jira/browse/BEAM-1970.
> Could this also be what is affecting you? Are you running several pipelines
> on the same Flink cluster, either one after another or at the same time?
>
> Best,
> Aljoscha
>
> On 28. Apr 2017, at 12:45, Borisa Zivkovic <bo...@gmail.com>
> wrote:
>
> Hi,
>
> I have this small pipeline that is writing data to Kafka (using AvroCoder)
> and then another job is reading the same data from Kafka, doing few
> transformations and then writing data back to different Kafka topic
> (AvroCoder again).
>
> First pipeline is very simple, read data from a text file, create POJO,
> use AvroCoder to write POJO to Kafka.
>
> Second pipeline is also simple, read POJO from Kafka, do few
> transformations, create new POJO and write data to Kafka using AvroCoder
> again.
>
> When I use direct runner everything is ok.
>
> When I switch to flink runner (small remote flink cluster) I get this
> exception in the second pipeline
>
> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to
> test.MyClass
>
> This happens in the the first MapFunction immediately after reading data
> from Kafka.
>
> I found about this problem in Flink and how they resolve it but not sure
> how to fix this when using Beam?!
>
> https://issues.apache.org/jira/browse/FLINK-1390
>
> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very
> simple POJO.
>
> Not sure how to fix this and still continue using AvroCoder.
>
> My beam version is 0.6.0 - my flink version is 1.2.0
>
> Anyone experienced something similar or has idea how to fix/workaround
> this?
>
> thanks
>
>
>

Re: AvroCoder + KafkaIO + Flink problem

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
There is this open issue: https://issues.apache.org/jira/browse/BEAM-1970 <https://issues.apache.org/jira/browse/BEAM-1970>. Could this also be what is affecting you? Are you running several pipelines on the same Flink cluster, either one after another or at the same time?

Best,
Aljoscha
> On 28. Apr 2017, at 12:45, Borisa Zivkovic <bo...@gmail.com> wrote:
> 
> Hi,
> 
> I have this small pipeline that is writing data to Kafka (using AvroCoder) and then another job is reading the same data from Kafka, doing few transformations and then writing data back to different Kafka topic (AvroCoder again).
> 
> First pipeline is very simple, read data from a text file, create POJO, use AvroCoder to write POJO to Kafka.
> 
> Second pipeline is also simple, read POJO from Kafka, do few transformations, create new POJO and write data to Kafka using AvroCoder again.
> 
> When I use direct runner everything is ok.
> 
> When I switch to flink runner (small remote flink cluster) I get this exception in the second pipeline
> 
> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to test.MyClass
> 
> This happens in the the first MapFunction immediately after reading data from Kafka.
> 
> I found about this problem in Flink and how they resolve it but not sure how to fix this when using Beam?!
> 
> https://issues.apache.org/jira/browse/FLINK-1390 <https://issues.apache.org/jira/browse/FLINK-1390>
> 
> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very simple POJO.
> 
> Not sure how to fix this and still continue using AvroCoder.
> 
> My beam version is 0.6.0 - my flink version is 1.2.0
> 
> Anyone experienced something similar or has idea how to fix/workaround this?
> 
> thanks
>