You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Cristian Constantinescu <ze...@gmail.com> on 2022/02/08 20:17:58 UTC

Bean 2.36.0 + Flink 1.13 appears to be broken

Hi everyone,

I am very excited with the 2.36 release, especially the stopReadOffset
addition to the KafkaSourceDescriptors. With it, I can read sections of a
topic and create state,effectively having a bounded kafka source, before
reading new items that need processing.

Unfortunately, running the pipeline from the Flink CLI produces the
following error:

Pretty printing Flink args:
--detached
--class=namespace.pipeline.App
/opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar
--configFilePath=/path/to/config.properties
--runner=FlinkRunner
--streaming
--checkpointingInterval=30000
--stateBackend=filesystem
--stateBackendStoragePath=file:///path/to/state
--numberOfExecutionRetries=2
--fasterCopy
--debugThrowExceptions
java.lang.IncompatibleClassChangeError: Class
org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does
not implement the requested interface
org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum
        at
org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50)
        at
org.apache.beam.sdk.transforms.resourcehints.ResourceHints.<clinit>(ResourceHints.java:54)
        at org.apache.beam.sdk.Pipeline.<init>(Pipeline.java:523)
        at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157)
        at lines containing Pipeline.create(options) <--- my code
        at namespace.pipeline.App.main(App.java:42) <-- my code
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Any advice would be appreciated.

Thank you,
Cristian

Re: Bean 2.36.0 + Flink 1.13 appears to be broken

Posted by Tomo Suzuki <su...@google.com>.
Glad to hear that. My pleasure.

On Tue, Feb 8, 2022 at 16:40 Cristian Constantinescu <ze...@gmail.com>
wrote:

> Hey Tomo,
>
> Thanks for the tip! It turns out my deployment project (the one that
> creates the fat jar) and my pipelines project (the one with actual code)
> had mismatching Beam versions.
>
> User error, sorry about that.
>
> Thanks for your help,
> Cristian
>
> On Tue, Feb 8, 2022 at 3:32 PM Tomo Suzuki <su...@google.com> wrote:
>
>> IncompatibleClassChangeError usually occurs when some a class comes from
>> an older version of JAR file.
>>
>> Do you know which JAR file provides
>> "org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum"
>> when the exception happens?
>>
>> I often use "getProtectionDomain()"
>> https://stackoverflow.com/a/56000383/975074 to find the JAR file from a
>> class.
>>
>>
>> On Tue, Feb 8, 2022 at 3:18 PM Cristian Constantinescu <ze...@gmail.com>
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I am very excited with the 2.36 release, especially the stopReadOffset
>>> addition to the KafkaSourceDescriptors. With it, I can read sections of a
>>> topic and create state,effectively having a bounded kafka source, before
>>> reading new items that need processing.
>>>
>>> Unfortunately, running the pipeline from the Flink CLI produces the
>>> following error:
>>>
>>> Pretty printing Flink args:
>>> --detached
>>> --class=namespace.pipeline.App
>>> /opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar
>>> --configFilePath=/path/to/config.properties
>>> --runner=FlinkRunner
>>> --streaming
>>> --checkpointingInterval=30000
>>> --stateBackend=filesystem
>>> --stateBackendStoragePath=file:///path/to/state
>>> --numberOfExecutionRetries=2
>>> --fasterCopy
>>> --debugThrowExceptions
>>> java.lang.IncompatibleClassChangeError: Class
>>> org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does
>>> not implement the requested interface
>>> org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum
>>>         at
>>> org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50)
>>>         at
>>> org.apache.beam.sdk.transforms.resourcehints.ResourceHints.<clinit>(ResourceHints.java:54)
>>>         at org.apache.beam.sdk.Pipeline.<init>(Pipeline.java:523)
>>>         at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157)
>>>         at lines containing Pipeline.create(options) <--- my code
>>>         at namespace.pipeline.App.main(App.java:42) <-- my code
>>>         at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>>         at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>> Source)
>>>         at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>>> Source)
>>>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>>>         at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>>         at
>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>>
>>> Any advice would be appreciated.
>>>
>>> Thank you,
>>> Cristian
>>>
>>
>>
>> --
>> Regards,
>> Tomo
>>
> --
Regards,
Tomo

Re: Bean 2.36.0 + Flink 1.13 appears to be broken

Posted by Cristian Constantinescu <ze...@gmail.com>.
Hey Tomo,

Thanks for the tip! It turns out my deployment project (the one that
creates the fat jar) and my pipelines project (the one with actual code)
had mismatching Beam versions.

User error, sorry about that.

Thanks for your help,
Cristian

On Tue, Feb 8, 2022 at 3:32 PM Tomo Suzuki <su...@google.com> wrote:

> IncompatibleClassChangeError usually occurs when some a class comes from
> an older version of JAR file.
>
> Do you know which JAR file provides
> "org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum"
> when the exception happens?
>
> I often use "getProtectionDomain()"
> https://stackoverflow.com/a/56000383/975074 to find the JAR file from a
> class.
>
>
> On Tue, Feb 8, 2022 at 3:18 PM Cristian Constantinescu <ze...@gmail.com>
> wrote:
>
>> Hi everyone,
>>
>> I am very excited with the 2.36 release, especially the stopReadOffset
>> addition to the KafkaSourceDescriptors. With it, I can read sections of a
>> topic and create state,effectively having a bounded kafka source, before
>> reading new items that need processing.
>>
>> Unfortunately, running the pipeline from the Flink CLI produces the
>> following error:
>>
>> Pretty printing Flink args:
>> --detached
>> --class=namespace.pipeline.App
>> /opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar
>> --configFilePath=/path/to/config.properties
>> --runner=FlinkRunner
>> --streaming
>> --checkpointingInterval=30000
>> --stateBackend=filesystem
>> --stateBackendStoragePath=file:///path/to/state
>> --numberOfExecutionRetries=2
>> --fasterCopy
>> --debugThrowExceptions
>> java.lang.IncompatibleClassChangeError: Class
>> org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does
>> not implement the requested interface
>> org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum
>>         at
>> org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50)
>>         at
>> org.apache.beam.sdk.transforms.resourcehints.ResourceHints.<clinit>(ResourceHints.java:54)
>>         at org.apache.beam.sdk.Pipeline.<init>(Pipeline.java:523)
>>         at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157)
>>         at lines containing Pipeline.create(options) <--- my code
>>         at namespace.pipeline.App.main(App.java:42) <-- my code
>>         at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>>         at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>>         at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source)
>>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>>         at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>         at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>         at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>         at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>         at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>         at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>         at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>
>> Any advice would be appreciated.
>>
>> Thank you,
>> Cristian
>>
>
>
> --
> Regards,
> Tomo
>

Re: Bean 2.36.0 + Flink 1.13 appears to be broken

Posted by Tomo Suzuki <su...@google.com>.
IncompatibleClassChangeError usually occurs when some a class comes from an
older version of JAR file.

Do you know which JAR file provides
"org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum"
when the exception happens?

I often use "getProtectionDomain()"
https://stackoverflow.com/a/56000383/975074 to find the JAR file from a
class.


On Tue, Feb 8, 2022 at 3:18 PM Cristian Constantinescu <ze...@gmail.com>
wrote:

> Hi everyone,
>
> I am very excited with the 2.36 release, especially the stopReadOffset
> addition to the KafkaSourceDescriptors. With it, I can read sections of a
> topic and create state,effectively having a bounded kafka source, before
> reading new items that need processing.
>
> Unfortunately, running the pipeline from the Flink CLI produces the
> following error:
>
> Pretty printing Flink args:
> --detached
> --class=namespace.pipeline.App
> /opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar
> --configFilePath=/path/to/config.properties
> --runner=FlinkRunner
> --streaming
> --checkpointingInterval=30000
> --stateBackend=filesystem
> --stateBackendStoragePath=file:///path/to/state
> --numberOfExecutionRetries=2
> --fasterCopy
> --debugThrowExceptions
> java.lang.IncompatibleClassChangeError: Class
> org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does
> not implement the requested interface
> org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum
>         at
> org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50)
>         at
> org.apache.beam.sdk.transforms.resourcehints.ResourceHints.<clinit>(ResourceHints.java:54)
>         at org.apache.beam.sdk.Pipeline.<init>(Pipeline.java:523)
>         at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157)
>         at lines containing Pipeline.create(options) <--- my code
>         at namespace.pipeline.App.main(App.java:42) <-- my code
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>         at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>         at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>         at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>         at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Any advice would be appreciated.
>
> Thank you,
> Cristian
>


-- 
Regards,
Tomo