You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Praveen K Viswanathan <ha...@gmail.com> on 2020/10/03 01:33:32 UTC

Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

Hi - We have a beam pipeline reading and writing using an SDF based IO
connector working fine in a local machine using Direct Runner or Flink
Runner. However when we build an image of that pipeline along with Flink
and deploy in a cluster we get below exception.

ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
> Unhandled exception.
> org.apache.flink.client.program.ProgramInvocationException: The program
> caused an error:
>
> Classpath:
> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
> System.out: (none)
> System.err: (none)
>     at
> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>     at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>     at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>     at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>     at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>     at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>     at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>     at
> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199)
>     at
> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>     at
> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>     at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>     at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>     at com.org.cx.signals.Booster.main(Booster.java:278)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>     at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>     ... 8 more


In our pom.xml we have created a profile for flink-runner as shown below.

<profiles>
>    <profile>
>       <id>flink-runner</id>
>          <!-- Makes the FlinkRunner available when running a pipeline. -->
>          <dependencies>
>           <dependency>
>            <groupId>org.apache.beam</groupId>
>            <artifactId>beam-runners-flink-1.10</artifactId>
>            <version>2.21.0</version>
>           <!-- <scope>runtime</scope> -->
>       </dependency>
>   </dependencies>
>   </profile>
> </profiles>


And the docker image has below flink version

FROM flink:1.10.0-scala_2.12


Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.
Appreciate if you can guide us on what is causing this exception.

-- 
Thanks,
Praveen K Viswanathan

Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Thanks Luke. I will check on the "portable" mode with Flink. Also hoping
that 2.25.0 will be released soon.

Regards,
Praveen

On Mon, Oct 5, 2020 at 11:46 AM Luke Cwik <lc...@google.com> wrote:

> Impulse in a released version of Apache Beam is only supported if you run
> your pipeline in "portable" mode with Flink. See
> https://beam.apache.org/documentation/runners/flink/ for some example
> instructions on how to run a "portable" pipeline.
>
> I added support for impulse in non portable pipeline execution to Flink in
> https://github.com/apache/beam/pull/12708 which will be available in
> 2.25.0 release (this release is currently underway).
>
> On Mon, Oct 5, 2020 at 11:28 AM Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>> Thanks for sharing the tool Tomo. I will give it a try and let you know.
>>
>> Regards,
>> Praveen
>>
>> On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki <su...@google.com> wrote:
>>
>>> I suspect your dependencies have conflict. I develop Linkage Checker
>>> enforcer rule to identify incompatible dependencies. Do you want to give it
>>> a try?
>>>
>>> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>>>
>>> Regards,
>>> Tomo
>>>
>>> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
>>> harish.praveen@gmail.com> wrote:
>>>
>>>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>>>> connector working fine in a local machine using Direct Runner or Flink
>>>> Runner. However when we build an image of that pipeline along with Flink
>>>> and deploy in a cluster we get below exception.
>>>>
>>>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>> Unhandled exception.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>> program caused an error:
>>>>>
>>>>> Classpath:
>>>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>>>> System.out: (none)
>>>>> System.err: (none)
>>>>>     at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>>>>     at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>>>>     at
>>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>>>>     at
>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>>>>     at
>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>>>>     at
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>>>>     at
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>>>>     at
>>>>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199)
>>>>>     at
>>>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>>>>     at
>>>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>>>>     at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>>>>     at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>>>>     at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>>>>     at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>>>>     at
>>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>>>>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>>>>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>>>>     at
>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>>>>     at
>>>>> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>>>>     at com.org.cx.signals.Booster.main(Booster.java:278)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>     at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>     at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>>     at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>>     at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>>>>     ... 8 more
>>>>
>>>>
>>>> In our pom.xml we have created a profile for flink-runner as shown
>>>> below.
>>>>
>>>> <profiles>
>>>>>    <profile>
>>>>>       <id>flink-runner</id>
>>>>>          <!-- Makes the FlinkRunner available when running a pipeline.
>>>>> -->
>>>>>          <dependencies>
>>>>>           <dependency>
>>>>>            <groupId>org.apache.beam</groupId>
>>>>>            <artifactId>beam-runners-flink-1.10</artifactId>
>>>>>            <version>2.21.0</version>
>>>>>           <!-- <scope>runtime</scope> -->
>>>>>       </dependency>
>>>>>   </dependencies>
>>>>>   </profile>
>>>>> </profiles>
>>>>
>>>>
>>>> And the docker image has below flink version
>>>>
>>>> FROM flink:1.10.0-scala_2.12
>>>>
>>>>
>>>> Both our pipeline and SDF based IO connector are on Beam 2.23.0
>>>> version. Appreciate if you can guide us on what is causing this exception.
>>>>
>>>> --
>>>> Thanks,
>>>> Praveen K Viswanathan
>>>>
>>> --
>>> Regards,
>>> Tomo
>>>
>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

Posted by Luke Cwik <lc...@google.com>.
Impulse in a released version of Apache Beam is only supported if you run
your pipeline in "portable" mode with Flink. See
https://beam.apache.org/documentation/runners/flink/ for some example
instructions on how to run a "portable" pipeline.

I added support for impulse in non portable pipeline execution to Flink in
https://github.com/apache/beam/pull/12708 which will be available in 2.25.0
release (this release is currently underway).

On Mon, Oct 5, 2020 at 11:28 AM Praveen K Viswanathan <
harish.praveen@gmail.com> wrote:

> Thanks for sharing the tool Tomo. I will give it a try and let you know.
>
> Regards,
> Praveen
>
> On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki <su...@google.com> wrote:
>
>> I suspect your dependencies have conflict. I develop Linkage Checker
>> enforcer rule to identify incompatible dependencies. Do you want to give it
>> a try?
>>
>> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>>
>> Regards,
>> Tomo
>>
>> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
>> harish.praveen@gmail.com> wrote:
>>
>>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>>> connector working fine in a local machine using Direct Runner or Flink
>>> Runner. However when we build an image of that pipeline along with Flink
>>> and deploy in a cluster we get below exception.
>>>
>>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>> Unhandled exception.
>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>> caused an error:
>>>>
>>>> Classpath:
>>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>>> System.out: (none)
>>>> System.err: (none)
>>>>     at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>>>     at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>>>     at
>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>>>     at
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>>>     at
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>>>     at
>>>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199)
>>>>     at
>>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>>>     at
>>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>>>     at
>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>>>     at
>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>>>     at
>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>>>     at
>>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>>>     at
>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>>>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>>>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>>>     at
>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>>>     at
>>>> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>>>     at com.org.cx.signals.Booster.main(Booster.java:278)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>     at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>     at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>>>     ... 8 more
>>>
>>>
>>> In our pom.xml we have created a profile for flink-runner as shown below.
>>>
>>> <profiles>
>>>>    <profile>
>>>>       <id>flink-runner</id>
>>>>          <!-- Makes the FlinkRunner available when running a pipeline.
>>>> -->
>>>>          <dependencies>
>>>>           <dependency>
>>>>            <groupId>org.apache.beam</groupId>
>>>>            <artifactId>beam-runners-flink-1.10</artifactId>
>>>>            <version>2.21.0</version>
>>>>           <!-- <scope>runtime</scope> -->
>>>>       </dependency>
>>>>   </dependencies>
>>>>   </profile>
>>>> </profiles>
>>>
>>>
>>> And the docker image has below flink version
>>>
>>> FROM flink:1.10.0-scala_2.12
>>>
>>>
>>> Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.
>>> Appreciate if you can guide us on what is causing this exception.
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>> --
>> Regards,
>> Tomo
>>
>
>
> --
> Thanks,
> Praveen K Viswanathan
>

Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Thanks for sharing the tool Tomo. I will give it a try and let you know.

Regards,
Praveen

On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki <su...@google.com> wrote:

> I suspect your dependencies have conflict. I develop Linkage Checker
> enforcer rule to identify incompatible dependencies. Do you want to give it
> a try?
>
> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>
> Regards,
> Tomo
>
> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>> connector working fine in a local machine using Direct Runner or Flink
>> Runner. However when we build an image of that pipeline along with Flink
>> and deploy in a cluster we get below exception.
>>
>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>> Unhandled exception.
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> caused an error:
>>>
>>> Classpath:
>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>> System.out: (none)
>>> System.err: (none)
>>>     at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>>     at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>>     at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>>     at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>>     at
>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>>     at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>>     at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>>     at
>>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199)
>>>     at
>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>>     at
>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>>     at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>>     at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>>     at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>>     at
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>>     at
>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>>     at
>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>>     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>>     at com.org.cx.signals.Booster.main(Booster.java:278)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>     at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>>     ... 8 more
>>
>>
>> In our pom.xml we have created a profile for flink-runner as shown below.
>>
>> <profiles>
>>>    <profile>
>>>       <id>flink-runner</id>
>>>          <!-- Makes the FlinkRunner available when running a pipeline.
>>> -->
>>>          <dependencies>
>>>           <dependency>
>>>            <groupId>org.apache.beam</groupId>
>>>            <artifactId>beam-runners-flink-1.10</artifactId>
>>>            <version>2.21.0</version>
>>>           <!-- <scope>runtime</scope> -->
>>>       </dependency>
>>>   </dependencies>
>>>   </profile>
>>> </profiles>
>>
>>
>> And the docker image has below flink version
>>
>> FROM flink:1.10.0-scala_2.12
>>
>>
>> Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.
>> Appreciate if you can guide us on what is causing this exception.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
> --
> Regards,
> Tomo
>


-- 
Thanks,
Praveen K Viswanathan

Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Hi Luke - If I deploy the pipeline with 2.23.0, then I am getting below
"The transform beam:transform:impulse:v1 is currently not supported"
exception.

2020-10-03 00:42:31,117 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
> exception.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: The transform beam:transform:impulse:v1 is
> currently not supported.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>     at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>     at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>     at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>     at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>     at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>     at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException:
> *The transform beam:transform:impulse:v1 is currently not supported.*
> at
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:133)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>     at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>     at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>     at
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>     at
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:88)
>     at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:117)
>     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>     at com.oracle.cx.signals.Booster.main(Booster.java:278)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>     ... 10 more


Below is the transformation details of the pipeline from log.

>
> 2020-10-03 00:42:29,442 WARN  org.apache.beam.sdk.Pipeline
>                  - The following transforms do not have stable unique
> names: ParDo(Anonymous)
> 2020-10-03 00:42:29,445 INFO  org.apache.beam.runners.flink.FlinkRunner
>                   - Executing pipeline using FlinkRunner.
> 2020-10-03 00:42:29,447 INFO  org.apache.beam.runners.flink.FlinkRunner
>                   - Translating pipeline to Flink program.
> 2020-10-03 00:42:29,456 INFO
>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment  - Found
> unbounded PCollection. Switching to streaming execution.
> 2020-10-03 00:42:29,462 INFO
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments      - Creating a
> Streaming Environment.
> 2020-10-03 00:42:29,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: jobmanager.rpc.address,
> booster-app-16e2bc1f-jm-689c9bc7f-mvcmg
> 2020-10-03 00:42:29,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-10-03 00:42:29,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: jobmanager.heap.size, 1024m
> 2020-10-03 00:42:29,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: taskmanager.memory.process.size, 1568m
> 2020-10-03 00:42:29,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-10-03 00:42:29,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: parallelism.default, 1
> 2020-10-03 00:42:29,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-10-03 00:42:29,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: s3.access-key,
> d7a8b37b50ae280045ac8a2daed0451850a7196a
> 2020-10-03 00:42:29,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: s3.secret-key, ******
> 2020-10-03 00:42:29,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: s3.path-style-access, true
> 2020-10-03 00:42:29,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: s3.endpoint,
> https://axo0ime4ahzu.compat.objectstorage.us-phoenix-1.oraclecloud.com
> 2020-10-03 00:42:29,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: zookeeper.sasl.disable, true
> 2020-10-03 00:42:29,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: metrics.reporters, prom
> 2020-10-03 00:42:29,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: metrics.reporter.prom.class,
> org.apache.flink.metrics.prometheus.PrometheusReporter
> 2020-10-03 00:42:29,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: metrics.reporter.prom.port, 8080
> 2020-10-03 00:42:29,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: blob.server.port, 6124
> 2020-10-03 00:42:29,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: query.server.port, 6125
> 2020-10-03 00:42:29,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: blob.server.port, 6125
> 2020-10-03 00:42:29,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: high-availability, zookeeper
> 2020-10-03 00:42:29,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: high-availability.cluster-id, /dev-flink-test
> 2020-10-03 00:42:29,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: high-availability.jobmanager.port, 6123
> 2020-10-03 00:42:29,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: high-availability.storageDir,
> s3p://dev-flink-test/job-metadata
> 2020-10-03 00:42:29,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: high-availability.zookeeper.path.root, /flink
> 2020-10-03 00:42:29,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: high-availability.zookeeper.quorum,
> zk-0.zk-hs.default.svc.cluster.local:2181
> 2020-10-03 00:42:29,469 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: jobmanager.heap.size, 976562k
> 2020-10-03 00:42:29,469 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-10-03 00:42:29,469 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: jobmanager.web.port, 8081
> 2020-10-03 00:42:29,469 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: metrics.internal.query-service.port, 50101
> 2020-10-03 00:42:29,470 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: query.server.port, 6124
> 2020-10-03 00:42:29,470 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: state.backend, filesystem
> 2020-10-03 00:42:29,470 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: state.checkpoints.dir,
> s3p://dev-flink-test/checkpoints/
> 2020-10-03 00:42:29,470 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: state.savepoints.dir,
> s3p://dev-flink-test/savepoints/
> 2020-10-03 00:42:29,470 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: taskmanager.heap.size, 976562k
> 2020-10-03 00:42:29,471 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: taskmanager.network.memory.fraction, 0.1
> 2020-10-03 00:42:29,471 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: taskmanager.network.memory.min, 10m
> 2020-10-03 00:42:29,471 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: taskmanager.numberOfTaskSlots, 2
> 2020-10-03 00:42:29,471 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: web.upload.dir, /opt/flink
> 2020-10-03 00:42:29,472 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: high-availability.cluster-id, booster-app-16e2bc1f
> 2020-10-03 00:42:29,472 INFO
>  org.apache.flink.configuration.GlobalConfiguration            - Loading
> configuration property: jobmanager.rpc.address, 172.16.0.144
> 2020-10-03 00:42:29,507 WARN
>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment  -
> UnboundedSources present which rely on checkpointing, but checkpointing is
> disabled.
> 2020-10-03 00:42:30,650 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  -
>  enterCompositeTransform-
> 2020-10-03 00:42:30,651 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  enterCompositeTransform- Create.Values
> 2020-10-03 00:42:30,675 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  visitPrimitiveTransform- Create.Values/Read(CreateSource)
> 2020-10-03 00:42:30,865 WARN  org.apache.beam.sdk.coders.SerializableCoder
>                  - Can't verify serialized elements of type BoundedSource
> have well defined equals method. This may produce incorrect results on some
> PipelineRunner
> 2020-10-03 00:42:30,908 INFO
>  org.apache.flink.api.java.typeutils.TypeExtractor             - No fields
> were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance.
> 2020-10-03 00:42:30,976 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  leaveCompositeTransform- Create.Values
> 2020-10-03 00:42:30,977 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  enterCompositeTransform- ParDo(Anonymous)
> 2020-10-03 00:42:30,977 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  visitPrimitiveTransform- ParDo(Anonymous)/ParMultiDo(Anonymous)
> 2020-10-03 00:42:31,000 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  leaveCompositeTransform- ParDo(Anonymous)
> 2020-10-03 00:42:31,001 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  enterCompositeTransform- Parse Signals
> 2020-10-03 00:42:31,001 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  visitPrimitiveTransform- Parse Signals/ParMultiDo(ParseSignals)
> 2020-10-03 00:42:31,014 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  leaveCompositeTransform- Parse Signals
> 2020-10-03 00:42:31,019 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  enterCompositeTransform- Log.LoggingTransform
> 2020-10-03 00:42:31,019 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  enterCompositeTransform- Log.LoggingTransform/ParDo(Anonymous)
> 2020-10-03 00:42:31,023 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |    visitPrimitiveTransform-
> Log.LoggingTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
> 2020-10-03 00:42:31,041 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  leaveCompositeTransform- Log.LoggingTransform/ParDo(Anonymous)
> 2020-10-03 00:42:31,041 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  leaveCompositeTransform- Log.LoggingTransform
> 2020-10-03 00:42:31,041 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  enterCompositeTransform- View.AsList
> 2020-10-03 00:42:31,041 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  enterCompositeTransform- View.AsList/View.VoidKeyToMultimapMaterialization
> 2020-10-03 00:42:31,041 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |    enterCompositeTransform-
> View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)
> 2020-10-03 00:42:31,041 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |    visitPrimitiveTransform-
> View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> 2020-10-03 00:42:31,051 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |    leaveCompositeTransform-
> View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)
> 2020-10-03 00:42:31,051 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  leaveCompositeTransform- View.AsList/View.VoidKeyToMultimapMaterialization
> 2020-10-03 00:42:31,051 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  enterCompositeTransform- View.AsList/View.CreatePCollectionView
> 2020-10-03 00:42:31,051 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |    enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)
> 2020-10-03 00:42:31,051 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |    enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys
> 2020-10-03 00:42:31,051 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |    enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys
> 2020-10-03 00:42:31,052 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |    enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map
> 2020-10-03 00:42:31,052 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |   |    visitPrimitiveTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> 2020-10-03 00:42:31,068 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |    leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map
> 2020-10-03 00:42:31,068 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |    leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys
> 2020-10-03 00:42:31,068 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |    leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys
> 2020-10-03 00:42:31,068 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |    enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> 2020-10-03 00:42:31,086 INFO
>  org.apache.flink.api.java.typeutils.TypeExtractor             - No fields
> were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance.
> 2020-10-03 00:42:31,105 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |    translated-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> 2020-10-03 00:42:31,105 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |    leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> 2020-10-03 00:42:31,105 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |    enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values
> 2020-10-03 00:42:31,105 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |    enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values
> 2020-10-03 00:42:31,105 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |    enterCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map
> 2020-10-03 00:42:31,105 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |   |    visitPrimitiveTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |   |    leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |   |    leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |   |    leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |    leaveCompositeTransform-
> View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
> |    visitPrimitiveTransform-
> View.AsList/View.CreatePCollectionView/CreateStreamingFlinkView.CreateFlinkPCollectionView
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  leaveCompositeTransform- View.AsList/View.CreatePCollectionView
> 2020-10-03 00:42:31,115 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  leaveCompositeTransform- View.AsList
> 2020-10-03 00:42:31,116 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |
>  enterCompositeTransform- Read from rawsignal
> 2020-10-03 00:42:31,116 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  - |   |
>  visitPrimitiveTransform- Read from rawsignal/Impulse
> 2020-10-03 00:42:31,116 INFO
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator  -
> *beam:transform:impulse:v1*


On Mon, Oct 5, 2020 at 9:51 AM Luke Cwik <lc...@google.com> wrote:

> In your pom.xml you are stating you want Flink 2.21.0 but you are using
> 2.23 elsewhere. You want these versions to match. Try updating your profile
> to:
>    <profile>
>       <id>flink-runner</id>
>          <!-- Makes the FlinkRunner available when running a pipeline. -->
>          <dependencies>
>           <dependency>
>            <groupId>org.apache.beam</groupId>
>            <artifactId>beam-runners-flink-1.10</artifactId>
>            <version>*2.23.0*</version>
>           <!-- <scope>runtime</scope> -->
>       </dependency>
>   </dependencies>
>   </profile>
>
> On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki <su...@google.com> wrote:
>
>> I suspect your dependencies have conflict. I develop Linkage Checker
>> enforcer rule to identify incompatible dependencies. Do you want to give it
>> a try?
>>
>> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>>
>> Regards,
>> Tomo
>>
>> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
>> harish.praveen@gmail.com> wrote:
>>
>>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>>> connector working fine in a local machine using Direct Runner or Flink
>>> Runner. However when we build an image of that pipeline along with Flink
>>> and deploy in a cluster we get below exception.
>>>
>>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>> Unhandled exception.
>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>> caused an error:
>>>>
>>>> Classpath:
>>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>>> System.out: (none)
>>>> System.err: (none)
>>>>     at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>>>     at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>>>     at
>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>>>     at
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>>>     at
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>>>     at
>>>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199)
>>>>     at
>>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>>>     at
>>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>>>     at
>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>>>     at
>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>>>     at
>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>>>     at
>>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>>>     at
>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>>>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>>>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>>>     at
>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>>>     at
>>>> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>>>     at com.org.cx.signals.Booster.main(Booster.java:278)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>     at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>     at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>>>     ... 8 more
>>>
>>>
>>> In our pom.xml we have created a profile for flink-runner as shown below.
>>>
>>> <profiles>
>>>>    <profile>
>>>>       <id>flink-runner</id>
>>>>          <!-- Makes the FlinkRunner available when running a pipeline.
>>>> -->
>>>>          <dependencies>
>>>>           <dependency>
>>>>            <groupId>org.apache.beam</groupId>
>>>>            <artifactId>beam-runners-flink-1.10</artifactId>
>>>>            <version>2.21.0</version>
>>>>           <!-- <scope>runtime</scope> -->
>>>>       </dependency>
>>>>   </dependencies>
>>>>   </profile>
>>>> </profiles>
>>>
>>>
>>> And the docker image has below flink version
>>>
>>> FROM flink:1.10.0-scala_2.12
>>>
>>>
>>> Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.
>>> Appreciate if you can guide us on what is causing this exception.
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>> --
>> Regards,
>> Tomo
>>
>

-- 
Thanks,
Praveen K Viswanathan

Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

Posted by Luke Cwik <lc...@google.com>.
In your pom.xml you are stating you want Flink 2.21.0 but you are using
2.23 elsewhere. You want these versions to match. Try updating your profile
to:
   <profile>
      <id>flink-runner</id>
         <!-- Makes the FlinkRunner available when running a pipeline. -->
         <dependencies>
          <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-flink-1.10</artifactId>
           <version>*2.23.0*</version>
          <!-- <scope>runtime</scope> -->
      </dependency>
  </dependencies>
  </profile>

On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki <su...@google.com> wrote:

> I suspect your dependencies have conflict. I develop Linkage Checker
> enforcer rule to identify incompatible dependencies. Do you want to give it
> a try?
>
> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>
> Regards,
> Tomo
>
> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>> connector working fine in a local machine using Direct Runner or Flink
>> Runner. However when we build an image of that pipeline along with Flink
>> and deploy in a cluster we get below exception.
>>
>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>> Unhandled exception.
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> caused an error:
>>>
>>> Classpath:
>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>> System.out: (none)
>>> System.err: (none)
>>>     at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>>     at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>>     at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>>     at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>>     at
>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>>     at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>>     at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>>     at
>>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199)
>>>     at
>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>>     at
>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>>     at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>>     at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>>     at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>>     at
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>>     at
>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>>     at
>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>>     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>>     at com.org.cx.signals.Booster.main(Booster.java:278)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>     at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>>     ... 8 more
>>
>>
>> In our pom.xml we have created a profile for flink-runner as shown below.
>>
>> <profiles>
>>>    <profile>
>>>       <id>flink-runner</id>
>>>          <!-- Makes the FlinkRunner available when running a pipeline.
>>> -->
>>>          <dependencies>
>>>           <dependency>
>>>            <groupId>org.apache.beam</groupId>
>>>            <artifactId>beam-runners-flink-1.10</artifactId>
>>>            <version>2.21.0</version>
>>>           <!-- <scope>runtime</scope> -->
>>>       </dependency>
>>>   </dependencies>
>>>   </profile>
>>> </profiles>
>>
>>
>> And the docker image has below flink version
>>
>> FROM flink:1.10.0-scala_2.12
>>
>>
>> Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.
>> Appreciate if you can guide us on what is causing this exception.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
> --
> Regards,
> Tomo
>

Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

Posted by Tomo Suzuki <su...@google.com>.
I suspect your dependencies have conflict. I develop Linkage Checker
enforcer rule to identify incompatible dependencies. Do you want to give it
a try?
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule

Regards,
Tomo

On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <ha...@gmail.com>
wrote:

> Hi - We have a beam pipeline reading and writing using an SDF based IO
> connector working fine in a local machine using Direct Runner or Flink
> Runner. However when we build an image of that pipeline along with Flink
> and deploy in a cluster we get below exception.
>
> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>> Unhandled exception.
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> caused an error:
>>
>> Classpath:
>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>> System.out: (none)
>> System.err: (none)
>>     at
>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>     at
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>     at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>     at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>     at
>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>     at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>     at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>     at
>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199)
>>     at
>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>     at
>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>     at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>     at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>     at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>     at
>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>     at
>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>     at
>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>     at com.org.cx.signals.Booster.main(Booster.java:278)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>     at
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>     ... 8 more
>
>
> In our pom.xml we have created a profile for flink-runner as shown below.
>
> <profiles>
>>    <profile>
>>       <id>flink-runner</id>
>>          <!-- Makes the FlinkRunner available when running a pipeline. -->
>>          <dependencies>
>>           <dependency>
>>            <groupId>org.apache.beam</groupId>
>>            <artifactId>beam-runners-flink-1.10</artifactId>
>>            <version>2.21.0</version>
>>           <!-- <scope>runtime</scope> -->
>>       </dependency>
>>   </dependencies>
>>   </profile>
>> </profiles>
>
>
> And the docker image has below flink version
>
> FROM flink:1.10.0-scala_2.12
>
>
> Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.
> Appreciate if you can guide us on what is causing this exception.
>
> --
> Thanks,
> Praveen K Viswanathan
>
-- 
Regards,
Tomo