You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by peay <pe...@protonmail.com> on 2017/04/05 19:52:01 UTC

Unhelpful ExceptionInChainedStubException errors with Flink runner

Hello,

I've been having some trouble with debugging exceptions in user code when using the Flink runner. Here's an example from a window/DoFn/GroupByKey pipeline.

ERROR o.a.f.runtime.operators.BatchTask - Error in task code: CHAIN MapPartition (MapPartition at ParDo(MyDoFn)) -> FlatMap (Transform/Windowing/Window.Assign.out) -> Map (Key Extractor) -> GroupCombine (GroupCombine at GroupCombine: GroupByKey) -> Map (Key Extractor) (1/8)
org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) ~[beam-sdks-java-core-0.6.0.jar:0.6.0]
at org.org.my.pipelines.MyDoFn$auxiliary$s09rfuPj.invokeProcessElement(Unknown Source) ~[na:na]
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:198) ~[beam-runners-core-java-0.6.0.jar:0.6.0]
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:156) ~[beam-runners-core-java-0.6.0.jar:0.6.0]
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:109) ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0]
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) [flink-runtime_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) [flink-runtime_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) [flink-runtime_2.10-1.2.0.jar:1.2.0]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: null
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82) ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:138) ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0]
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:351) ~[beam-runners-core-java-0.6.0.jar:0.6.0]
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:545) ~[beam-runners-core-java-0.6.0.jar:0.6.0]
at org.my.pipelines.MyDoFn.processElement(MyDoFn.java:49) ~[pipelines-0.1.jar:na]

The top stacktrace references some kind of anonymous `invokeProcessElement(Unknown Source)` which is not really informative. The bottom stacktrace references my call to `context.output()`, which is even more confusing. I've gone through fixing a couple issue by manually try/catching and logging directly from within `processElement`, but this is far from ideal. Any advice on how to interpret those and possibly set things up in order to get more helpful error messages would be really helpful.

Running Beam 0.6, Flink 1.2.

Thanks!

Re: Unhelpful ExceptionInChainedStubException errors with Flink runner

Posted by Aljoscha Krettek <al...@apache.org>.
Sorry for the slow response. I've had this marked in my inbox but due to
the preparations for Flink Forward I didn't have time to look at it yet.
I'll definitely still do that.




On Thu, Apr 13, 2017, at 05:15, Davor Bonaci wrote:

> Aljoscha, any ideas perhaps?

> 

> On Wed, Apr 5, 2017 at 12:52 PM, peay <pe...@protonmail.com> wrote:

>> Hello,

>> 

>> I've been having some trouble with debugging exceptions in user code
>> when using the Flink runner. Here's an example from a
>> window/DoFn/GroupByKey pipeline.
>> 

>> ERROR o.a.f.runtime.operators.BatchTask - Error in task code:  CHAIN
>> MapPartition (MapPartition at ParDo(MyDoFn)) -> FlatMap
>> (Transform/Windowing/Window.Assign.out) -> Map (Key Extractor) ->
>> GroupCombine (GroupCombine at GroupCombine: GroupByKey) -> Map (Key
>> Extractor) (1/8)
>> org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime-
>> .operators.chaining.ExceptionInChainedStubException
>>         at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeE-
>>         xception.java:36) ~[beam-sdks-java-core-0.6.0.jar:0.6.0]
>>         at org.org.my.pipelines.MyDoFn$auxiliary$s09rfuPj.invokeProc-
>>         essElement(Unknown Source) ~[na:na]
>>         at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProce-
>>         ssElement(SimpleDoFnRunner.java:198) ~[beam-runners-core-java-
>>         0.6.0.jar:0.6.0]
>>         at org.apache.beam.runners.core.SimpleDoFnRunner.processElem-
>>         ent(SimpleDoFnRunner.java:156) ~[beam-runners-core-java-
>>         0.6.0.jar:0.6.0]
>>         at org.apache.beam.runners.flink.translation.functions.Flink-
>>         DoFnFunction.mapPartition(FlinkDoFnFunction.java:109) ~[beam-runners-flink_2.10-
>>         0.6.0.jar:0.6.0]
>>         at org.apache.flink.runtime.operators.MapPartitionDriver.run-
>>         (MapPartitionDriver.java:103) ~[flink-runtime_2.10-
>>         1.2.0.jar:1.2.0]
>>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTas-
>>         k.java:490) [flink-runtime_2.10-1.2.0.jar:1.2.0]
>>         at org.apache.flink.runtime.operators.BatchTask.invoke(Batch-
>>         Task.java:355) [flink-runtime_2.10-1.2.0.jar:1.2.0]
>>         at
>>         org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>         [flink-runtime_2.10-1.2.0.jar:1.2.0]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

>> org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubEx-
>> ception: null
>>         at org.apache.flink.runtime.operators.chaining.ChainedFlatMa-
>>         pDriver.collect(ChainedFlatMapDriver.java:82) ~[flink-runtime_2.10-
>>         1.2.0.jar:1.2.0]
>>         at org.apache.flink.runtime.operators.util.metrics.CountingC-
>>         ollector.collect(CountingCollector.java:35) ~[flink-runtime_2.10-
>>         1.2.0.jar:1.2.0]
>>         at org.apache.beam.runners.flink.translation.functions.Flink-
>>         DoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java-
>>         :138) ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0]
>>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext-
>>         .outputWindowedValue(SimpleDoFnRunner.java:351) ~[beam-runners-core-java-
>>         0.6.0.jar:0.6.0]
>>         at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcess-
>>         Context.output(SimpleDoFnRunner.java:545) ~[beam-runners-core-java-
>>         0.6.0.jar:0.6.0]
>>         at org.my.pipelines.MyDoFn.processElement(MyDoFn.java:49) ~[pipelines-
>>         0.1.jar:na]
>> 

>> The top stacktrace references some kind of anonymous
>> `invokeProcessElement(Unknown Source)` which is not really
>> informative. The bottom stacktrace references my call to
>> `context.output()`, which is even more confusing. I've gone through
>> fixing a couple issue by manually try/catching and logging directly
>> from within `processElement`, but this is far from ideal. Any advice
>> on how to interpret those and possibly set things up in order to get
>> more helpful error messages would be really helpful.
>> 

>> Running Beam 0.6, Flink 1.2.

>> 

>> Thanks!

>> 

>> 

Re: Unhelpful ExceptionInChainedStubException errors with Flink runner

Posted by Davor Bonaci <da...@apache.org>.
Aljoscha, any ideas perhaps?

On Wed, Apr 5, 2017 at 12:52 PM, peay <pe...@protonmail.com> wrote:

> Hello,
>
> I've been having some trouble with debugging exceptions in user code when
> using the Flink runner. Here's an example from a window/DoFn/GroupByKey
> pipeline.
>
> ERROR o.a.f.runtime.operators.BatchTask - Error in task code:  CHAIN
> MapPartition (MapPartition at ParDo(MyDoFn)) -> FlatMap
> (Transform/Windowing/Window.Assign.out) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: GroupByKey) -> Map (Key
> Extractor) (1/8)
> org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.
> operators.chaining.ExceptionInChainedStubException
>        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> ~[beam-sdks-java-core-0.6.0.jar:0.6.0]
>        at org.org.my.pipelines.MyDoFn$auxiliary$s09rfuPj.invokeProcessElement(Unknown
> Source) ~[na:na]
>        at org.apache.beam.runners.core.SimpleDoFnRunner.
> invokeProcessElement(SimpleDoFnRunner.java:198)
> ~[beam-runners-core-java-0.6.0.jar:0.6.0]
>        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:156)
> ~[beam-runners-core-java-0.6.0.jar:0.6.0]
>        at org.apache.beam.runners.flink.translation.functions.
> FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:109)
> ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0]
>        at org.apache.flink.runtime.operators.MapPartitionDriver.
> run(MapPartitionDriver.java:103) ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
>        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> [flink-runtime_2.10-1.2.0.jar:1.2.0]
>        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> [flink-runtime_2.10-1.2.0.jar:1.2.0]
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> [flink-runtime_2.10-1.2.0.jar:1.2.0]
>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
> org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException:
> null
>        at org.apache.flink.runtime.operators.chaining.
> ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82)
> ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
>        at org.apache.flink.runtime.operators.util.metrics.
> CountingCollector.collect(CountingCollector.java:35)
> ~[flink-runtime_2.10-1.2.0.jar:1.2.0]
>        at org.apache.beam.runners.flink.translation.functions.
> FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:138)
> ~[beam-runners-flink_2.10-0.6.0.jar:0.6.0]
>        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.
> outputWindowedValue(SimpleDoFnRunner.java:351)
> ~[beam-runners-core-java-0.6.0.jar:0.6.0]
>        at org.apache.beam.runners.core.SimpleDoFnRunner$
> DoFnProcessContext.output(SimpleDoFnRunner.java:545)
> ~[beam-runners-core-java-0.6.0.jar:0.6.0]
>        at org.my.pipelines.MyDoFn.processElement(MyDoFn.java:49)
> ~[pipelines-0.1.jar:na]
>
> The top stacktrace references some kind of anonymous
> `invokeProcessElement(Unknown Source)` which is not really informative. The
> bottom stacktrace references my call to `context.output()`, which is even
> more confusing. I've gone through fixing a couple issue by manually
> try/catching and logging directly from within `processElement`, but this is
> far from ideal. Any advice on how to interpret those and possibly set
> things up in order to get more helpful error messages would be really
> helpful.
>
> Running Beam 0.6, Flink 1.2.
>
> Thanks!
>
>
>