You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pawel Szczur <pa...@gmail.com> on 2016/09/01 14:14:56 UTC

Beam + Flink + Proto = NoSuchMethodError

Hi,

I'm trying to run a simple pipeline on local cluster using Protocol Buffer
to pass data between Beam functions.

Everything works fine if I run it through:
java -jar target/dataflow-test-1.0-SNAPSHOT.jar
--runner=org.apache.beam.runners.flink.FlinkRunner
--input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt

But it fails when trying to run on flink cluster:
flink run target/dataflow-test-1.0-SNAPSHOT.jar
--runner=org.apache.beam.runners.flink.FlinkRunner
--input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt

The ready to run project:
https://github.com/orian/beam-flink-local-cluster

Any clues?

Cheers, Pawel

------------------------------------------------------------
 The program finished with the following exception:

java.lang.NoSuchMethodError:
com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Dan Halperin <dh...@google.com>.
Hi Pawel,

https://github.com/apache/incubator-beam/blob/master/runners/flink/pom.xml#L42

Dan

On Fri, Nov 4, 2016 at 2:08 PM, Pawel Szczur <pa...@gmail.com> wrote:

> Can we bump the version of Flink in Beam? Is there a reason to stuck with
> 1.0.3? From my experience it tends to be harder and harder to change in
> future.
>
> 2016-09-05 18:41 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> I actually just got it to work by updating the Akka dependency of Flink
>> to 2.4.9 (Akka 2.4.x does not have a Protobuf dependency anymore and
>> Protobuf only came into Flink as a transitive dependency.) I did this on
>> Flink 1.1.2 so I also had to update the Flink Runner to 1.1.2 (I also
>> changed the Runner pom to create a shaded "bundled" jar). Here are the two
>> branches that you can use to get it to work:
>>
>>  - https://github.com/aljoscha/flink/tree/flink-1.1.2-akka-2.4.9
>>  - https://github.com/aljoscha/incubator-beam/tree/flink-1.1.2
>>
>> The steps I did:
>>  - checkout Flink
>>  - run "mvn clean install -DskipTests"
>>  - checkout Beam
>>  - run "mvn clean install -DskipTests"
>>  - build your testing project using "mvn clean package"
>>  - copy beam-runners-flink_2.10-bundled-0.3.0-incubating-SNAPSHOT.jar
>> from <beam-root>/runners/flink/runner/target to the lib/ folder of the
>> Flink install
>>  - copy the jar from your testing project to the lib/ folder as well
>> (this is important)
>>  - only now start the cluster
>>  - run using "bin/flink run" while also specifying your jar
>>
>> One caveat is that you have to move the program jar to the lib folder as
>> well because of some class loader issues. It doesn't work if you simply
>> give it as an argument to "bin/flink run". Also, the Web Dashboard seems to
>> not work with those two jars in the lib folder. Probably because there is
>> some stuff in those jars that shouldn't really be there.
>>
>> In the future we should probably provide ready-made packages for this and
>> update both Flink and Beam.
>>
>> Cheers,
>> Aljoscha
>>
>> P.S. While writing this I just saw your second mail. Good that you also
>> found a solution! :-)
>>
>> On Mon, 5 Sep 2016 at 17:42 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> Thanks.
>>>
>>> As far as I found Protobuf is used in two places independently in Beam:
>>>  - ProtoCoder, dependency in: (https://github.com/apache/inc
>>> ubator-beam/blob/master/sdks/java/core/pom.xml)
>>>  - in FlinkRunner code, it's transient comes from org.apache.flink
>>>
>>> Now I'm thinking, would it be possible to shade the com.google.protobuf
>>> in all Flink cluster and flink-runner.jar and leaving the Beam (i could use
>>> version 3.0.0-beta1 for my proto)?
>>> (I've tried, and I think it doesn't really work)
>>>
>>> I will try your approach with shading the proto in Beam and my program.
>>> I'm using jarjar to replace package name in jar.
>>>
>>> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> This seems to be a bigger problem. I got it to compile and (almost) run
>>>> by shading away the protobuf dependency of Beam in the Flink Runner jar:
>>>> https://github.com/apache/incubator-beam/blob/c83783209
>>>> f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223.
>>>> This does not really work, though, since now your code will not use a
>>>> ProtoCoder but a SerializableCoder for your protobuf-generated class. The
>>>> problem is that Beam uses reflection to determine whether a ProtoCoder can
>>>> be used on user classes. Now, your user class will be derived from
>>>> com.google.protobuf.MessageOrBuilder but the Beam code will look for
>>>> something like flink.relocated.com.google.protobuf.MessageOrBuilder so
>>>> that doesn't work.
>>>>
>>>> The only solution I see for this is to compile both Beam and the user
>>>> program, create a fat-jar from this and then create another fat jar where
>>>> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
>>>> user code. That's not a very nice solution, though.
>>>>
>>>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <pa...@gmail.com> wrote:
>>>>
>>>>> I've tried version 2.5.0, no difference. I've found that problem is in
>>>>> a Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>>>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>>>>> Flink or rename it by adding some prefix like 'org.apache.flink'
>>>>> +'com.google.protobuf' when used in Flink.
>>>>>
>>>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>
>>>>>> I imagine that it's caused by the classpath being different when you
>>>>>> run it using the Flink command. It might also be that your program fails at
>>>>>> a different point once you fix the first problem, due to the protobuf
>>>>>> mismatches.
>>>>>>
>>>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <pa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is
>>>>>>> it that when I run it as a binary it works fine, but sent to local cluster
>>>>>>> it fails?
>>>>>>>
>>>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>
>>>>>>>> Yep, as I said the problem is most likely that Flink has a
>>>>>>>> dependency on a different version of protobuf so that clashes with the
>>>>>>>> version that Beam provides or that you have as a dependency. Have you tried
>>>>>>>> setting 2.5.0 as the version, since that's what Flink uses. In the end
>>>>>>>> that's not a proper solution, however, and both Flink and Beam should
>>>>>>>> likely shade their protobuf dependency, not sure of that's possible, though.
>>>>>>>>
>>>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for reply.
>>>>>>>>>
>>>>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>>>>>> just for curiosity. It still didn't work.
>>>>>>>>>
>>>>>>>>> I've added explicitly to pom.xml:
>>>>>>>>>
>>>>>>>>> <dependency>
>>>>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>>>>     <version>3.0.0-beta1</version>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>> Did I miss some param?
>>>>>>>>>
>>>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> I think this is the classic problem of dependency version
>>>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a
>>>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your
>>>>>>>>>> program through the bin/flink command the order in the classpath might be
>>>>>>>>>> different and you're getting the wrong version.
>>>>>>>>>>
>>>>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>>>>> conflicts.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm trying to run a simple pipeline on local cluster using
>>>>>>>>>>> Protocol Buffer to pass data between Beam functions.
>>>>>>>>>>>
>>>>>>>>>>> Everything works fine if I run it through:
>>>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>>>
>>>>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>>>
>>>>>>>>>>> The ready to run project:
>>>>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>>>>
>>>>>>>>>>> Any clues?
>>>>>>>>>>>
>>>>>>>>>>> Cheers, Pawel
>>>>>>>>>>>
>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>>>
>>>>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR
>>>>>>>>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/
>>>>>>>>>>> String;)Ljava/util/Set;
>>>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>>>>>>> ssorImpl.java:62)
>>>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>>>>>>> thodAccessorImpl.java:43)
>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>>>>>>>>>> od(PackagedProgram.java:505)
>>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>>>>>>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.
>>>>>>>>>>> java:248)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>>>>>>>>>> liFrontend.java:866)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>>>>>>>>>> end.java:1192)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
>>>>>>>>>>> 1243)
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Pawel Szczur <pa...@gmail.com>.
Can we bump the version of Flink in Beam? Is there a reason to stuck with
1.0.3? From my experience it tends to be harder and harder to change in
future.

2016-09-05 18:41 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> I actually just got it to work by updating the Akka dependency of Flink to
> 2.4.9 (Akka 2.4.x does not have a Protobuf dependency anymore and Protobuf
> only came into Flink as a transitive dependency.) I did this on Flink 1.1.2
> so I also had to update the Flink Runner to 1.1.2 (I also changed the
> Runner pom to create a shaded "bundled" jar). Here are the two branches
> that you can use to get it to work:
>
>  - https://github.com/aljoscha/flink/tree/flink-1.1.2-akka-2.4.9
>  - https://github.com/aljoscha/incubator-beam/tree/flink-1.1.2
>
> The steps I did:
>  - checkout Flink
>  - run "mvn clean install -DskipTests"
>  - checkout Beam
>  - run "mvn clean install -DskipTests"
>  - build your testing project using "mvn clean package"
>  - copy beam-runners-flink_2.10-bundled-0.3.0-incubating-SNAPSHOT.jar
> from <beam-root>/runners/flink/runner/target to the lib/ folder of the
> Flink install
>  - copy the jar from your testing project to the lib/ folder as well (this
> is important)
>  - only now start the cluster
>  - run using "bin/flink run" while also specifying your jar
>
> One caveat is that you have to move the program jar to the lib folder as
> well because of some class loader issues. It doesn't work if you simply
> give it as an argument to "bin/flink run". Also, the Web Dashboard seems to
> not work with those two jars in the lib folder. Probably because there is
> some stuff in those jars that shouldn't really be there.
>
> In the future we should probably provide ready-made packages for this and
> update both Flink and Beam.
>
> Cheers,
> Aljoscha
>
> P.S. While writing this I just saw your second mail. Good that you also
> found a solution! :-)
>
> On Mon, 5 Sep 2016 at 17:42 Pawel Szczur <pa...@gmail.com> wrote:
>
>> Thanks.
>>
>> As far as I found Protobuf is used in two places independently in Beam:
>>  - ProtoCoder, dependency in: (https://github.com/apache/
>> incubator-beam/blob/master/sdks/java/core/pom.xml)
>>  - in FlinkRunner code, it's transient comes from org.apache.flink
>>
>> Now I'm thinking, would it be possible to shade the com.google.protobuf
>> in all Flink cluster and flink-runner.jar and leaving the Beam (i could use
>> version 3.0.0-beta1 for my proto)?
>> (I've tried, and I think it doesn't really work)
>>
>> I will try your approach with shading the proto in Beam and my program.
>> I'm using jarjar to replace package name in jar.
>>
>> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> This seems to be a bigger problem. I got it to compile and (almost) run
>>> by shading away the protobuf dependency of Beam in the Flink Runner jar:
>>> https://github.com/apache/incubator-beam/blob/
>>> c83783209f739fc541cea25f22cfe542b75ffa55/runners/flink/
>>> runner/pom.xml#L223. This does not really work, though, since now your
>>> code will not use a ProtoCoder but a SerializableCoder for your
>>> protobuf-generated class. The problem is that Beam uses reflection to
>>> determine whether a ProtoCoder can be used on user classes. Now, your user
>>> class will be derived from com.google.protobuf.MessageOrBuilder but the
>>> Beam code will look for something like flink.relocated.com.google.protobuf.MessageOrBuilder
>>> so that doesn't work.
>>>
>>> The only solution I see for this is to compile both Beam and the user
>>> program, create a fat-jar from this and then create another fat jar where
>>> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
>>> user code. That's not a very nice solution, though.
>>>
>>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <pa...@gmail.com> wrote:
>>>
>>>> I've tried version 2.5.0, no difference. I've found that problem is in
>>>> a Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>>>> Flink or rename it by adding some prefix like 'org.apache.flink'
>>>> +'com.google.protobuf' when used in Flink.
>>>>
>>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>
>>>>> I imagine that it's caused by the classpath being different when you
>>>>> run it using the Flink command. It might also be that your program fails at
>>>>> a different point once you fix the first problem, due to the protobuf
>>>>> mismatches.
>>>>>
>>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is
>>>>>> it that when I run it as a binary it works fine, but sent to local cluster
>>>>>> it fails?
>>>>>>
>>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>
>>>>>>> Yep, as I said the problem is most likely that Flink has a
>>>>>>> dependency on a different version of protobuf so that clashes with the
>>>>>>> version that Beam provides or that you have as a dependency. Have you tried
>>>>>>> setting 2.5.0 as the version, since that's what Flink uses. In the end
>>>>>>> that's not a proper solution, however, and both Flink and Beam should
>>>>>>> likely shade their protobuf dependency, not sure of that's possible, though.
>>>>>>>
>>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for reply.
>>>>>>>>
>>>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>>>>> just for curiosity. It still didn't work.
>>>>>>>>
>>>>>>>> I've added explicitly to pom.xml:
>>>>>>>>
>>>>>>>> <dependency>
>>>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>>>     <version>3.0.0-beta1</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>> Did I miss some param?
>>>>>>>>
>>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I think this is the classic problem of dependency version
>>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a
>>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your
>>>>>>>>> program through the bin/flink command the order in the classpath might be
>>>>>>>>> different and you're getting the wrong version.
>>>>>>>>>
>>>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>>>> conflicts.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm trying to run a simple pipeline on local cluster using
>>>>>>>>>> Protocol Buffer to pass data between Beam functions.
>>>>>>>>>>
>>>>>>>>>> Everything works fine if I run it through:
>>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>>
>>>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>>
>>>>>>>>>> The ready to run project:
>>>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>>>
>>>>>>>>>> Any clues?
>>>>>>>>>>
>>>>>>>>>> Cheers, Pawel
>>>>>>>>>>
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>>
>>>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.
>>>>>>>>>> ExtensionRegistry.getAllImmutableExtensionsByExt
>>>>>>>>>> endedType(Ljava/lang/String;)Ljava/util/Set;
>>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>>>>>>> NativeMethodAccessorImpl.java:62)
>>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>>>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.
>>>>>>>>>> callMainMethod(PackagedProgram.java:505)
>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.
>>>>>>>>>> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(
>>>>>>>>>> Client.java:248)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(
>>>>>>>>>> CliFrontend.java:866)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(
>>>>>>>>>> CliFrontend.java:1192)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.
>>>>>>>>>> java:1243)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Aljoscha Krettek <al...@apache.org>.
I actually just got it to work by updating the Akka dependency of Flink to
2.4.9 (Akka 2.4.x does not have a Protobuf dependency anymore and Protobuf
only came into Flink as a transitive dependency.) I did this on Flink 1.1.2
so I also had to update the Flink Runner to 1.1.2 (I also changed the
Runner pom to create a shaded "bundled" jar). Here are the two branches
that you can use to get it to work:

 - https://github.com/aljoscha/flink/tree/flink-1.1.2-akka-2.4.9
 - https://github.com/aljoscha/incubator-beam/tree/flink-1.1.2

The steps I did:
 - checkout Flink
 - run "mvn clean install -DskipTests"
 - checkout Beam
 - run "mvn clean install -DskipTests"
 - build your testing project using "mvn clean package"
 - copy beam-runners-flink_2.10-bundled-0.3.0-incubating-SNAPSHOT.jar from
<beam-root>/runners/flink/runner/target to the lib/ folder of the Flink
install
 - copy the jar from your testing project to the lib/ folder as well (this
is important)
 - only now start the cluster
 - run using "bin/flink run" while also specifying your jar

One caveat is that you have to move the program jar to the lib folder as
well because of some class loader issues. It doesn't work if you simply
give it as an argument to "bin/flink run". Also, the Web Dashboard seems to
not work with those two jars in the lib folder. Probably because there is
some stuff in those jars that shouldn't really be there.

In the future we should probably provide ready-made packages for this and
update both Flink and Beam.

Cheers,
Aljoscha

P.S. While writing this I just saw your second mail. Good that you also
found a solution! :-)

On Mon, 5 Sep 2016 at 17:42 Pawel Szczur <pa...@gmail.com> wrote:

> Thanks.
>
> As far as I found Protobuf is used in two places independently in Beam:
>  - ProtoCoder, dependency in: (
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/pom.xml
> )
>  - in FlinkRunner code, it's transient comes from org.apache.flink
>
> Now I'm thinking, would it be possible to shade the com.google.protobuf in
> all Flink cluster and flink-runner.jar and leaving the Beam (i could use
> version 3.0.0-beta1 for my proto)?
> (I've tried, and I think it doesn't really work)
>
> I will try your approach with shading the proto in Beam and my program.
> I'm using jarjar to replace package name in jar.
>
> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> This seems to be a bigger problem. I got it to compile and (almost) run
>> by shading away the protobuf dependency of Beam in the Flink Runner jar:
>> https://github.com/apache/incubator-beam/blob/c83783209f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223.
>> This does not really work, though, since now your code will not use a
>> ProtoCoder but a SerializableCoder for your protobuf-generated class. The
>> problem is that Beam uses reflection to determine whether a ProtoCoder can
>> be used on user classes. Now, your user class will be derived from
>> com.google.protobuf.MessageOrBuilder but the Beam code will look for
>> something like flink.relocated.com.google.protobuf.MessageOrBuilder so
>> that doesn't work.
>>
>> The only solution I see for this is to compile both Beam and the user
>> program, create a fat-jar from this and then create another fat jar where
>> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
>> user code. That's not a very nice solution, though.
>>
>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> I've tried version 2.5.0, no difference. I've found that problem is in a
>>> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>>> Flink or rename it by adding some prefix like 'org.apache.flink'
>>> +'com.google.protobuf' when used in Flink.
>>>
>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> I imagine that it's caused by the classpath being different when you
>>>> run it using the Flink command. It might also be that your program fails at
>>>> a different point once you fix the first problem, due to the protobuf
>>>> mismatches.
>>>>
>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <pa...@gmail.com> wrote:
>>>>
>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>>>>> that when I run it as a binary it works fine, but sent to local cluster it
>>>>> fails?
>>>>>
>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>
>>>>>> Yep, as I said the problem is most likely that Flink has a dependency
>>>>>> on a different version of protobuf so that clashes with the version that
>>>>>> Beam provides or that you have as a dependency. Have you tried setting
>>>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's not a
>>>>>> proper solution, however, and both Flink and Beam should likely shade their
>>>>>> protobuf dependency, not sure of that's possible, though.
>>>>>>
>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for reply.
>>>>>>>
>>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>>>> just for curiosity. It still didn't work.
>>>>>>>
>>>>>>> I've added explicitly to pom.xml:
>>>>>>>
>>>>>>> <dependency>
>>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>>     <version>3.0.0-beta1</version>
>>>>>>> </dependency>
>>>>>>>
>>>>>>> Did I miss some param?
>>>>>>>
>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I think this is the classic problem of dependency version
>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a
>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your
>>>>>>>> program through the bin/flink command the order in the classpath might be
>>>>>>>> different and you're getting the wrong version.
>>>>>>>>
>>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>>> conflicts.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm trying to run a simple pipeline on local cluster using
>>>>>>>>> Protocol Buffer to pass data between Beam functions.
>>>>>>>>>
>>>>>>>>> Everything works fine if I run it through:
>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>
>>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>
>>>>>>>>> The ready to run project:
>>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>>
>>>>>>>>> Any clues?
>>>>>>>>>
>>>>>>>>> Cheers, Pawel
>>>>>>>>>
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>
>>>>>>>>> java.lang.NoSuchMethodError:
>>>>>>>>> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>> at
>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>> at
>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Pawel Szczur <pa...@gmail.com>.
Thank Aljoscha, your solution works.
For future, after building a fat jar, one must replace the package:
jarjar --mode process --rules rules.txt
target/dataflow-test-1.0-SNAPSHOT.jar --output
target/dataflow-test-1.0-SNAPSHOT.shade.jar

flink run --class com.mycompany.dataflow.WordCount
target/dataflow-test-1.0-SNAPSHOT.shade.jar
--runner=org.apache.beam.runners.flink.FlinkRunner
--input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt

I will update the example repo with the solution in case someone has same
problem.

2016-09-05 17:41 GMT+02:00 Pawel Szczur <pa...@gmail.com>:

> Thanks.
>
> As far as I found Protobuf is used in two places independently in Beam:
>  - ProtoCoder, dependency in: (https://github.com/apache/inc
> ubator-beam/blob/master/sdks/java/core/pom.xml)
>  - in FlinkRunner code, it's transient comes from org.apache.flink
>
> Now I'm thinking, would it be possible to shade the com.google.protobuf in
> all Flink cluster and flink-runner.jar and leaving the Beam (i could use
> version 3.0.0-beta1 for my proto)?
> (I've tried, and I think it doesn't really work)
>
> I will try your approach with shading the proto in Beam and my program.
> I'm using jarjar to replace package name in jar.
>
> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> This seems to be a bigger problem. I got it to compile and (almost) run
>> by shading away the protobuf dependency of Beam in the Flink Runner jar:
>> https://github.com/apache/incubator-beam/blob/c83783209
>> f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223. This
>> does not really work, though, since now your code will not use a ProtoCoder
>> but a SerializableCoder for your protobuf-generated class. The problem is
>> that Beam uses reflection to determine whether a ProtoCoder can be used on
>> user classes. Now, your user class will be derived from
>> com.google.protobuf.MessageOrBuilder but the Beam code will look for
>> something like flink.relocated.com.google.protobuf.MessageOrBuilder so
>> that doesn't work.
>>
>> The only solution I see for this is to compile both Beam and the user
>> program, create a fat-jar from this and then create another fat jar where
>> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
>> user code. That's not a very nice solution, though.
>>
>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> I've tried version 2.5.0, no difference. I've found that problem is in a
>>> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>>> Flink or rename it by adding some prefix like 'org.apache.flink'
>>> +'com.google.protobuf' when used in Flink.
>>>
>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> I imagine that it's caused by the classpath being different when you
>>>> run it using the Flink command. It might also be that your program fails at
>>>> a different point once you fix the first problem, due to the protobuf
>>>> mismatches.
>>>>
>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <pa...@gmail.com> wrote:
>>>>
>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>>>>> that when I run it as a binary it works fine, but sent to local cluster it
>>>>> fails?
>>>>>
>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>
>>>>>> Yep, as I said the problem is most likely that Flink has a dependency
>>>>>> on a different version of protobuf so that clashes with the version that
>>>>>> Beam provides or that you have as a dependency. Have you tried setting
>>>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's not a
>>>>>> proper solution, however, and both Flink and Beam should likely shade their
>>>>>> protobuf dependency, not sure of that's possible, though.
>>>>>>
>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for reply.
>>>>>>>
>>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>>>> just for curiosity. It still didn't work.
>>>>>>>
>>>>>>> I've added explicitly to pom.xml:
>>>>>>>
>>>>>>> <dependency>
>>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>>     <version>3.0.0-beta1</version>
>>>>>>> </dependency>
>>>>>>>
>>>>>>> Did I miss some param?
>>>>>>>
>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I think this is the classic problem of dependency version
>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a
>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your
>>>>>>>> program through the bin/flink command the order in the classpath might be
>>>>>>>> different and you're getting the wrong version.
>>>>>>>>
>>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>>> conflicts.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm trying to run a simple pipeline on local cluster using
>>>>>>>>> Protocol Buffer to pass data between Beam functions.
>>>>>>>>>
>>>>>>>>> Everything works fine if I run it through:
>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>
>>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>
>>>>>>>>> The ready to run project:
>>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>>
>>>>>>>>> Any clues?
>>>>>>>>>
>>>>>>>>> Cheers, Pawel
>>>>>>>>>
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>
>>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR
>>>>>>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/S
>>>>>>>>> tring;)Ljava/util/Set;
>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>>>>> ssorImpl.java:62)
>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>>>>> thodAccessorImpl.java:43)
>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>>>>>>>> od(PackagedProgram.java:505)
>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>>>>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.ja
>>>>>>>>> va:248)
>>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>>>>>>>> liFrontend.java:866)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>>>>>>>> end.java:1192)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Pawel Szczur <pa...@gmail.com>.
Thanks.

As far as I found Protobuf is used in two places independently in Beam:
 - ProtoCoder, dependency in: (https://github.com/apache/
incubator-beam/blob/master/sdks/java/core/pom.xml)
 - in FlinkRunner code, it's transient comes from org.apache.flink

Now I'm thinking, would it be possible to shade the com.google.protobuf in
all Flink cluster and flink-runner.jar and leaving the Beam (i could use
version 3.0.0-beta1 for my proto)?
(I've tried, and I think it doesn't really work)

I will try your approach with shading the proto in Beam and my program. I'm
using jarjar to replace package name in jar.

2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> This seems to be a bigger problem. I got it to compile and (almost) run by
> shading away the protobuf dependency of Beam in the Flink Runner jar:
> https://github.com/apache/incubator-beam/blob/c83783209
> f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223. This
> does not really work, though, since now your code will not use a ProtoCoder
> but a SerializableCoder for your protobuf-generated class. The problem is
> that Beam uses reflection to determine whether a ProtoCoder can be used on
> user classes. Now, your user class will be derived from
> com.google.protobuf.MessageOrBuilder but the Beam code will look for
> something like flink.relocated.com.google.protobuf.MessageOrBuilder so
> that doesn't work.
>
> The only solution I see for this is to compile both Beam and the user
> program, create a fat-jar from this and then create another fat jar where
> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
> user code. That's not a very nice solution, though.
>
> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <pa...@gmail.com> wrote:
>
>> I've tried version 2.5.0, no difference. I've found that problem is in a
>> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>> Flink or rename it by adding some prefix like 'org.apache.flink'
>> +'com.google.protobuf' when used in Flink.
>>
>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> I imagine that it's caused by the classpath being different when you run
>>> it using the Flink command. It might also be that your program fails at a
>>> different point once you fix the first problem, due to the protobuf
>>> mismatches.
>>>
>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <pa...@gmail.com> wrote:
>>>
>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>>>> that when I run it as a binary it works fine, but sent to local cluster it
>>>> fails?
>>>>
>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>
>>>>> Yep, as I said the problem is most likely that Flink has a dependency
>>>>> on a different version of protobuf so that clashes with the version that
>>>>> Beam provides or that you have as a dependency. Have you tried setting
>>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's not a
>>>>> proper solution, however, and both Flink and Beam should likely shade their
>>>>> protobuf dependency, not sure of that's possible, though.
>>>>>
>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for reply.
>>>>>>
>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>>> just for curiosity. It still didn't work.
>>>>>>
>>>>>> I've added explicitly to pom.xml:
>>>>>>
>>>>>> <dependency>
>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>     <version>3.0.0-beta1</version>
>>>>>> </dependency>
>>>>>>
>>>>>> Did I miss some param?
>>>>>>
>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>>
>>>>>>> Hi,
>>>>>>> I think this is the classic problem of dependency version conflicts.
>>>>>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency
>>>>>>> on protobuf 2.5.0 (through Akka). I think when running your program through
>>>>>>> the bin/flink command the order in the classpath might be different and
>>>>>>> you're getting the wrong version.
>>>>>>>
>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>> conflicts.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>>>>>>> Buffer to pass data between Beam functions.
>>>>>>>>
>>>>>>>> Everything works fine if I run it through:
>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>
>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>
>>>>>>>> The ready to run project:
>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>
>>>>>>>> Any clues?
>>>>>>>>
>>>>>>>> Cheers, Pawel
>>>>>>>>
>>>>>>>> ------------------------------------------------------------
>>>>>>>>  The program finished with the following exception:
>>>>>>>>
>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR
>>>>>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/
>>>>>>>> String;)Ljava/util/Set;
>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>>>> ssorImpl.java:62)
>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>>>> thodAccessorImpl.java:43)
>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>>>>>>> od(PackagedProgram.java:505)
>>>>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>>>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.
>>>>>>>> java:248)
>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>>>>>>> liFrontend.java:866)
>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>>>>>>> end.java:1192)
>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Aljoscha Krettek <al...@apache.org>.
This seems to be a bigger problem. I got it to compile and (almost) run by
shading away the protobuf dependency of Beam in the Flink Runner jar:
https://github.com/apache/incubator-beam/blob/c83783209f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223.
This does not really work, though, since now your code will not use a
ProtoCoder but a SerializableCoder for your protobuf-generated class. The
problem is that Beam uses reflection to determine whether a ProtoCoder can
be used on user classes. Now, your user class will be derived from
com.google.protobuf.MessageOrBuilder but the Beam code will look for
something like flink.relocated.com.google.protobuf.MessageOrBuilder so that
doesn't work.

The only solution I see for this is to compile both Beam and the user
program, create a fat-jar from this and then create another fat jar where
Protobuf is relocated in all the code, i.e. in both the  Beam code and the
user code. That's not a very nice solution, though.

On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <pa...@gmail.com> wrote:

> I've tried version 2.5.0, no difference. I've found that problem is in a
> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
> 2.5.0. I'm not sure if it's possible to enforce other proto version in
> Flink or rename it by adding some prefix like 'org.apache.flink'
> +'com.google.protobuf' when used in Flink.
>
> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> I imagine that it's caused by the classpath being different when you run
>> it using the Flink command. It might also be that your program fails at a
>> different point once you fix the first problem, due to the protobuf
>> mismatches.
>>
>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>>> that when I run it as a binary it works fine, but sent to local cluster it
>>> fails?
>>>
>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> Yep, as I said the problem is most likely that Flink has a dependency
>>>> on a different version of protobuf so that clashes with the version that
>>>> Beam provides or that you have as a dependency. Have you tried setting
>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's not a
>>>> proper solution, however, and both Flink and Beam should likely shade their
>>>> protobuf dependency, not sure of that's possible, though.
>>>>
>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com> wrote:
>>>>
>>>>> Thanks for reply.
>>>>>
>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>> just for curiosity. It still didn't work.
>>>>>
>>>>> I've added explicitly to pom.xml:
>>>>>
>>>>> <dependency>
>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>     <version>3.0.0-beta1</version>
>>>>> </dependency>
>>>>>
>>>>> Did I miss some param?
>>>>>
>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>>
>>>>>> Hi,
>>>>>> I think this is the classic problem of dependency version conflicts.
>>>>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency
>>>>>> on protobuf 2.5.0 (through Akka). I think when running your program through
>>>>>> the bin/flink command the order in the classpath might be different and
>>>>>> you're getting the wrong version.
>>>>>>
>>>>>> As an immediate fix, I think you could try having your own dependency
>>>>>> on protobuf and shade that, so that you don't have version conflicts.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>>>>>> Buffer to pass data between Beam functions.
>>>>>>>
>>>>>>> Everything works fine if I run it through:
>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>
>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>
>>>>>>> The ready to run project:
>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>
>>>>>>> Any clues?
>>>>>>>
>>>>>>> Cheers, Pawel
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>>  The program finished with the following exception:
>>>>>>>
>>>>>>> java.lang.NoSuchMethodError:
>>>>>>> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>>
>>>>>>
>>>>>
>>>
>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Pawel Szczur <pa...@gmail.com>.
I've tried version 2.5.0, no difference. I've found that problem is in a
Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
2.5.0. I'm not sure if it's possible to enforce other proto version in
Flink or rename it by adding some prefix like 'org.apache.flink'
+'com.google.protobuf' when used in Flink.

2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> I imagine that it's caused by the classpath being different when you run
> it using the Flink command. It might also be that your program fails at a
> different point once you fix the first problem, due to the protobuf
> mismatches.
>
> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <pa...@gmail.com> wrote:
>
>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>> that when I run it as a binary it works fine, but sent to local cluster it
>> fails?
>>
>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> Yep, as I said the problem is most likely that Flink has a dependency on
>>> a different version of protobuf so that clashes with the version that Beam
>>> provides or that you have as a dependency. Have you tried setting 2.5.0 as
>>> the version, since that's what Flink uses. In the end that's not a proper
>>> solution, however, and both Flink and Beam should likely shade their
>>> protobuf dependency, not sure of that's possible, though.
>>>
>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com> wrote:
>>>
>>>> Thanks for reply.
>>>>
>>>> I've tried that, I think it didn't work . I've explicitly tried version
>>>> 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 just for
>>>> curiosity. It still didn't work.
>>>>
>>>> I've added explicitly to pom.xml:
>>>>
>>>> <dependency>
>>>>     <groupId>com.google.protobuf</groupId>
>>>>     <artifactId>protobuf-java</artifactId>
>>>>     <version>3.0.0-beta1</version>
>>>> </dependency>
>>>>
>>>> Did I miss some param?
>>>>
>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>>
>>>>> Hi,
>>>>> I think this is the classic problem of dependency version conflicts.
>>>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency
>>>>> on protobuf 2.5.0 (through Akka). I think when running your program through
>>>>> the bin/flink command the order in the classpath might be different and
>>>>> you're getting the wrong version.
>>>>>
>>>>> As an immediate fix, I think you could try having your own dependency
>>>>> on protobuf and shade that, so that you don't have version conflicts.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>>>>> Buffer to pass data between Beam functions.
>>>>>>
>>>>>> Everything works fine if I run it through:
>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>
>>>>>> But it fails when trying to run on flink cluster:
>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>
>>>>>> The ready to run project:
>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>
>>>>>> Any clues?
>>>>>>
>>>>>> Cheers, Pawel
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>>  The program finished with the following exception:
>>>>>>
>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionRegistry.
>>>>>> getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)
>>>>>> Ljava/util/Set;
>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>>> NativeMethodAccessorImpl.java:62)
>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
>>>>>> PackagedProgram.java:505)
>>>>>> at org.apache.flink.client.program.PackagedProgram.
>>>>>> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>> at org.apache.flink.client.program.Client.runBlocking(
>>>>>> Client.java:248)
>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(
>>>>>> CliFrontend.java:866)
>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(
>>>>>> CliFrontend.java:1192)
>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>
>>>>>
>>>>
>>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Aljoscha Krettek <al...@apache.org>.
I imagine that it's caused by the classpath being different when you run it
using the Flink command. It might also be that your program fails at a
different point once you fix the first problem, due to the protobuf
mismatches.

On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <pa...@gmail.com> wrote:

> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
> that when I run it as a binary it works fine, but sent to local cluster it
> fails?
>
> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> Yep, as I said the problem is most likely that Flink has a dependency on
>> a different version of protobuf so that clashes with the version that Beam
>> provides or that you have as a dependency. Have you tried setting 2.5.0 as
>> the version, since that's what Flink uses. In the end that's not a proper
>> solution, however, and both Flink and Beam should likely shade their
>> protobuf dependency, not sure of that's possible, though.
>>
>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> Thanks for reply.
>>>
>>> I've tried that, I think it didn't work . I've explicitly tried version
>>> 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 just for
>>> curiosity. It still didn't work.
>>>
>>> I've added explicitly to pom.xml:
>>>
>>> <dependency>
>>>     <groupId>com.google.protobuf</groupId>
>>>     <artifactId>protobuf-java</artifactId>
>>>     <version>3.0.0-beta1</version>
>>> </dependency>
>>>
>>> Did I miss some param?
>>>
>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>>
>>>> Hi,
>>>> I think this is the classic problem of dependency version conflicts.
>>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency
>>>> on protobuf 2.5.0 (through Akka). I think when running your program through
>>>> the bin/flink command the order in the classpath might be different and
>>>> you're getting the wrong version.
>>>>
>>>> As an immediate fix, I think you could try having your own dependency
>>>> on protobuf and shade that, so that you don't have version conflicts.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>>>> Buffer to pass data between Beam functions.
>>>>>
>>>>> Everything works fine if I run it through:
>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>
>>>>> But it fails when trying to run on flink cluster:
>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>
>>>>> The ready to run project:
>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>
>>>>> Any clues?
>>>>>
>>>>> Cheers, Pawel
>>>>>
>>>>> ------------------------------------------------------------
>>>>>  The program finished with the following exception:
>>>>>
>>>>> java.lang.NoSuchMethodError:
>>>>> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>
>>>>
>>>
>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Pawel Szczur <pa...@gmail.com>.
Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it that
when I run it as a binary it works fine, but sent to local cluster it fails?

2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> Yep, as I said the problem is most likely that Flink has a dependency on a
> different version of protobuf so that clashes with the version that Beam
> provides or that you have as a dependency. Have you tried setting 2.5.0 as
> the version, since that's what Flink uses. In the end that's not a proper
> solution, however, and both Flink and Beam should likely shade their
> protobuf dependency, not sure of that's possible, though.
>
> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com> wrote:
>
>> Thanks for reply.
>>
>> I've tried that, I think it didn't work . I've explicitly tried version
>> 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 just for
>> curiosity. It still didn't work.
>>
>> I've added explicitly to pom.xml:
>>
>> <dependency>
>>     <groupId>com.google.protobuf</groupId>
>>     <artifactId>protobuf-java</artifactId>
>>     <version>3.0.0-beta1</version>
>> </dependency>
>>
>> Did I miss some param?
>>
>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>>
>>> Hi,
>>> I think this is the classic problem of dependency version conflicts.
>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency
>>> on protobuf 2.5.0 (through Akka). I think when running your program through
>>> the bin/flink command the order in the classpath might be different and
>>> you're getting the wrong version.
>>>
>>> As an immediate fix, I think you could try having your own dependency on
>>> protobuf and shade that, so that you don't have version conflicts.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>>> Buffer to pass data between Beam functions.
>>>>
>>>> Everything works fine if I run it through:
>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>
>>>> But it fails when trying to run on flink cluster:
>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>
>>>> The ready to run project:
>>>> https://github.com/orian/beam-flink-local-cluster
>>>>
>>>> Any clues?
>>>>
>>>> Cheers, Pawel
>>>>
>>>> ------------------------------------------------------------
>>>>  The program finished with the following exception:
>>>>
>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR
>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/
>>>> String;)Ljava/util/Set;
>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>>> od(PackagedProgram.java:505)
>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>>> liFrontend.java:866)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>>> end.java:1192)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>
>>>
>>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Aljoscha Krettek <al...@apache.org>.
Yep, as I said the problem is most likely that Flink has a dependency on a
different version of protobuf so that clashes with the version that Beam
provides or that you have as a dependency. Have you tried setting 2.5.0 as
the version, since that's what Flink uses. In the end that's not a proper
solution, however, and both Flink and Beam should likely shade their
protobuf dependency, not sure of that's possible, though.

On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <pa...@gmail.com> wrote:

> Thanks for reply.
>
> I've tried that, I think it didn't work . I've explicitly tried version
> 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 just for
> curiosity. It still didn't work.
>
> I've added explicitly to pom.xml:
>
> <dependency>
>     <groupId>com.google.protobuf</groupId>
>     <artifactId>protobuf-java</artifactId>
>     <version>3.0.0-beta1</version>
> </dependency>
>
> Did I miss some param?
>
> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:
>
>> Hi,
>> I think this is the classic problem of dependency version conflicts. Beam
>> has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency on
>> protobuf 2.5.0 (through Akka). I think when running your program through
>> the bin/flink command the order in the classpath might be different and
>> you're getting the wrong version.
>>
>> As an immediate fix, I think you could try having your own dependency on
>> protobuf and shade that, so that you don't have version conflicts.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>> Buffer to pass data between Beam functions.
>>>
>>> Everything works fine if I run it through:
>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>
>>> But it fails when trying to run on flink cluster:
>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>
>>> The ready to run project:
>>> https://github.com/orian/beam-flink-local-cluster
>>>
>>> Any clues?
>>>
>>> Cheers, Pawel
>>>
>>> ------------------------------------------------------------
>>>  The program finished with the following exception:
>>>
>>> java.lang.NoSuchMethodError:
>>> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>
>>
>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Pawel Szczur <pa...@gmail.com>.
Thanks for reply.

I've tried that, I think it didn't work . I've explicitly tried version
3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 just for
curiosity. It still didn't work.

I've added explicitly to pom.xml:

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.0.0-beta1</version>
</dependency>

Did I miss some param?

2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> Hi,
> I think this is the classic problem of dependency version conflicts. Beam
> has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency on
> protobuf 2.5.0 (through Akka). I think when running your program through
> the bin/flink command the order in the classpath might be different and
> you're getting the wrong version.
>
> As an immediate fix, I think you could try having your own dependency on
> protobuf and shade that, so that you don't have version conflicts.
>
> Cheers,
> Aljoscha
>
> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm trying to run a simple pipeline on local cluster using Protocol
>> Buffer to pass data between Beam functions.
>>
>> Everything works fine if I run it through:
>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar --runner=org.apache.beam.runners.flink.FlinkRunner
>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>
>> But it fails when trying to run on flink cluster:
>> flink run target/dataflow-test-1.0-SNAPSHOT.jar --runner=org.apache.beam.runners.flink.FlinkRunner
>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>
>> The ready to run project:
>> https://github.com/orian/beam-flink-local-cluster
>>
>> Any clues?
>>
>> Cheers, Pawel
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionRegistry.
>> getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)
>> Ljava/util/Set;
>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
>> PackagedProgram.java:505)
>> at org.apache.flink.client.program.PackagedProgram.
>> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(
>> CliFrontend.java:866)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> at org.apache.flink.client.CliFrontend.parseParameters(
>> CliFrontend.java:1192)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>
>

Re: Beam + Flink + Proto = NoSuchMethodError

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think this is the classic problem of dependency version conflicts. Beam
has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency on
protobuf 2.5.0 (through Akka). I think when running your program through
the bin/flink command the order in the classpath might be different and
you're getting the wrong version.

As an immediate fix, I think you could try having your own dependency on
protobuf and shade that, so that you don't have version conflicts.

Cheers,
Aljoscha

On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <pa...@gmail.com> wrote:

> Hi,
>
> I'm trying to run a simple pipeline on local cluster using Protocol Buffer
> to pass data between Beam functions.
>
> Everything works fine if I run it through:
> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
> --runner=org.apache.beam.runners.flink.FlinkRunner
> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>
> But it fails when trying to run on flink cluster:
> flink run target/dataflow-test-1.0-SNAPSHOT.jar
> --runner=org.apache.beam.runners.flink.FlinkRunner
> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>
> The ready to run project:
> https://github.com/orian/beam-flink-local-cluster
>
> Any clues?
>
> Cheers, Pawel
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> java.lang.NoSuchMethodError:
> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>