You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by rahul patwari <ra...@gmail.com> on 2019/03/27 16:17:04 UTC

NullPointerException - Session windows with Lateness in FlinkRunner

Hi,
I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster -
1.7.2.

I have this flow in my pipeline:
KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default
trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->
Enrichment  -->  KafkaSink

I am generating data in such a way that the first two records belong to two
different sessions. And, generating the third record before the first
session expires with the timestamp for the third record in such a way that
the two sessions will be merged to become a single session.

For Example, These are the sample input and output obtained when I ran the
same pipeline in DirectRunner.

Sample Input:
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}

Sample Output:
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}

Where "NumberOfRecords" is the count, "WST" is the Avro field Name which
indicates the window start time for the session window. Similarly "WET"
indicates the window End time of the session window. I am getting "WST" and
"WET" after remerging and applying ParDo(Enrichment stage of the pipeline).

The program ran successfully in DirectRunner. But, in FlinkRunner, I am
getting this exception when the third record arrives:

2019-03-27 15:31:00,442 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
-> (Window.Into()/Window.Assign.out ->
DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
-> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
key/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
(1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
2019-03-27 15:33:25,427 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
->
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
-> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) ->
DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
->
DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
-> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
key/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
(1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
        at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
        at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
        at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
        at
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
        at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at
org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
        at
org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
        at
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
        at
org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
        at
org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
        at
org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
        at
org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
        at
org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
        at
org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
        at
org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
        at
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
        at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)

Is this a known issue with FlinkRunner? Is Session Windows with lateness
@experimental in FlinkRunner?

I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster -
1.5.3 and came across the same exception.

I have also tried generating data with lateness as 0, and everything is
working as expected. Seems like there is no problem in merging the windows
of the records which belong to the same session.

Thanks,
Rahul

Re: NullPointerException - Session windows with Lateness in FlinkRunner

Posted by Maximilian Michels <mx...@apache.org>.
Hi Rahul,

Thanks for providing the detailed report. This looks like a bug rather 
than a limitation of the Flink Runner. We have integration tests for 
session windows with the Flink Runner but they seemed to have missed 
this issue.

Let me investigate and get back to you. Tracking issue: 
https://jira.apache.org/jira/browse/BEAM-6929

Thanks,
Max

On 28.03.19 03:01, rahul patwari wrote:
> +dev
> 
> On Wed 27 Mar, 2019, 9:47 PM rahul patwari, <rahulpatwari8383@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Hi,
>     I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink
>     Cluster - 1.7.2.
> 
>     I have this flow in my pipeline:
>     KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
>     gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes,
>     default trigger)  -->  BeamSQL(GroupBy query)  --> 
>     Window.remerge()  -->  Enrichment  -->  KafkaSink
> 
>     I am generating data in such a way that the first two records belong
>     to two different sessions. And, generating the third record before
>     the first session expires with the timestamp for the third record in
>     such a way that the two sessions will be merged to become a single
>     session.
> 
>     For Example, These are the sample input and output obtained when I
>     ran the same pipeline in DirectRunner.
> 
>     Sample Input:
>     {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
>     {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
>     {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
> 
>     Sample Output:
>     {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
>     15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
>     {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
>     15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
>     {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
>     15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
> 
>     Where "NumberOfRecords" is the count, "WST" is the Avro field Name
>     which indicates the window start time for the session window.
>     Similarly "WET" indicates the window End time of the session window.
>     I am getting "WST" and "WET" after remerging and applying
>     ParDo(Enrichment stage of the pipeline).
> 
>     The program ran successfully in DirectRunner. But, in FlinkRunner, I
>     am getting this exception when the third record arrives:
> 
>     2019-03-27 15:31:00,442 INFO 
>     org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>     Source: DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) ->
>     Flat Map ->
>     DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
>     -> (Window.Into()/Window.Assign.out ->
>     DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
>     ->
>     DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
>     by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
>     DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
>     -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
>     key/Map/ParMultiDo(Anonymous) ->
>     DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
>     ProducerRecord/Map/ParMultiDo(Anonymous) ->
>     DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
>     (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to
>     RUNNING.
>     2019-03-27 15:33:25,427 INFO 
>     org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>     DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
>     ->
>     DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
>     ->
>     DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
>     -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc)
>     -> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
>     DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
>     ->
>     DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
>     -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
>     key/Map/ParMultiDo(Anonymous) ->
>     DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
>     ProducerRecord/Map/ParMultiDo(Anonymous) ->
>     DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>     (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to
>     FAILED.
>     org.apache.beam.sdk.util.UserCodeException:
>     java.lang.NullPointerException
>              at
>     org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>              at
>     org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
>     Source)
>              at
>     org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>              at
>     org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
>              at
>     org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>              at
>     org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
>              at
>     org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
>              at
>     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>              at
>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
>              at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>              at java.lang.Thread.run(Thread.java:748)
>     Caused by: java.lang.NullPointerException
>              at
>     org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
>              at
>     org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
>              at
>     org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
>              at
>     org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
>              at
>     org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
>              at
>     org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
>              at
>     org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
>              at
>     org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
>              at
>     org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
>              at
>     org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
>              at
>     org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
>              at
>     org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> 
>     Is this a known issue with FlinkRunner? Is Session Windows with
>     lateness @experimental in FlinkRunner?
> 
>     I have also tried with Runner - beam-runners-flink_2.11, Flink
>     Cluster - 1.5.3 and came across the same exception.
> 
>     I have also tried generating data with lateness as 0, and everything
>     is working as expected. Seems like there is no problem in merging
>     the windows of the records which belong to the same session.
> 
>     Thanks,
>     Rahul
> 

Re: NullPointerException - Session windows with Lateness in FlinkRunner

Posted by Maximilian Michels <mx...@apache.org>.
Hi Rahul,

Thanks for providing the detailed report. This looks like a bug rather 
than a limitation of the Flink Runner. We have integration tests for 
session windows with the Flink Runner but they seemed to have missed 
this issue.

Let me investigate and get back to you. Tracking issue: 
https://jira.apache.org/jira/browse/BEAM-6929

Thanks,
Max

On 28.03.19 03:01, rahul patwari wrote:
> +dev
> 
> On Wed 27 Mar, 2019, 9:47 PM rahul patwari, <rahulpatwari8383@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Hi,
>     I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink
>     Cluster - 1.7.2.
> 
>     I have this flow in my pipeline:
>     KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
>     gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes,
>     default trigger)  -->  BeamSQL(GroupBy query)  --> 
>     Window.remerge()  -->  Enrichment  -->  KafkaSink
> 
>     I am generating data in such a way that the first two records belong
>     to two different sessions. And, generating the third record before
>     the first session expires with the timestamp for the third record in
>     such a way that the two sessions will be merged to become a single
>     session.
> 
>     For Example, These are the sample input and output obtained when I
>     ran the same pipeline in DirectRunner.
> 
>     Sample Input:
>     {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
>     {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
>     {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
> 
>     Sample Output:
>     {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
>     15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
>     {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
>     15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
>     {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
>     15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
> 
>     Where "NumberOfRecords" is the count, "WST" is the Avro field Name
>     which indicates the window start time for the session window.
>     Similarly "WET" indicates the window End time of the session window.
>     I am getting "WST" and "WET" after remerging and applying
>     ParDo(Enrichment stage of the pipeline).
> 
>     The program ran successfully in DirectRunner. But, in FlinkRunner, I
>     am getting this exception when the third record arrives:
> 
>     2019-03-27 15:31:00,442 INFO 
>     org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>     Source: DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) ->
>     Flat Map ->
>     DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
>     -> (Window.Into()/Window.Assign.out ->
>     DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
>     ->
>     DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
>     by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
>     DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
>     -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
>     key/Map/ParMultiDo(Anonymous) ->
>     DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
>     ProducerRecord/Map/ParMultiDo(Anonymous) ->
>     DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
>     (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to
>     RUNNING.
>     2019-03-27 15:33:25,427 INFO 
>     org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>     DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
>     ->
>     DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
>     ->
>     DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
>     -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc)
>     -> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
>     DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
>     ->
>     DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
>     -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
>     key/Map/ParMultiDo(Anonymous) ->
>     DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
>     ProducerRecord/Map/ParMultiDo(Anonymous) ->
>     DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>     (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to
>     FAILED.
>     org.apache.beam.sdk.util.UserCodeException:
>     java.lang.NullPointerException
>              at
>     org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>              at
>     org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
>     Source)
>              at
>     org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>              at
>     org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
>              at
>     org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>              at
>     org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
>              at
>     org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
>              at
>     org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>              at
>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
>              at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>              at java.lang.Thread.run(Thread.java:748)
>     Caused by: java.lang.NullPointerException
>              at
>     org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
>              at
>     org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
>              at
>     org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
>              at
>     org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
>              at
>     org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
>              at
>     org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
>              at
>     org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
>              at
>     org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
>              at
>     org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
>              at
>     org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
>              at
>     org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
>              at
>     org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> 
>     Is this a known issue with FlinkRunner? Is Session Windows with
>     lateness @experimental in FlinkRunner?
> 
>     I have also tried with Runner - beam-runners-flink_2.11, Flink
>     Cluster - 1.5.3 and came across the same exception.
> 
>     I have also tried generating data with lateness as 0, and everything
>     is working as expected. Seems like there is no problem in merging
>     the windows of the records which belong to the same session.
> 
>     Thanks,
>     Rahul
> 

Re: NullPointerException - Session windows with Lateness in FlinkRunner

Posted by rahul patwari <ra...@gmail.com>.
+dev

On Wed 27 Mar, 2019, 9:47 PM rahul patwari, <ra...@gmail.com>
wrote:

> Hi,
> I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster -
> 1.7.2.
>
> I have this flow in my pipeline:
> KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
> gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default
> trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->
> Enrichment  -->  KafkaSink
>
> I am generating data in such a way that the first two records belong to
> two different sessions. And, generating the third record before the first
> session expires with the timestamp for the third record in such a way that
> the two sessions will be merged to become a single session.
>
> For Example, These are the sample input and output obtained when I ran the
> same pipeline in DirectRunner.
>
> Sample Input:
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
>
> Sample Output:
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
>
> Where "NumberOfRecords" is the count, "WST" is the Avro field Name which
> indicates the window start time for the session window. Similarly "WET"
> indicates the window End time of the session window. I am getting "WST" and
> "WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
>
> The program ran successfully in DirectRunner. But, in FlinkRunner, I am
> getting this exception when the third record arrives:
>
> 2019-03-27 15:31:00,442 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
> DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
> -> (Window.Into()/Window.Assign.out ->
> DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
> by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
> DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
> (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
> 2019-03-27 15:33:25,427 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
> -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) ->
> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
> DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
> ->
> DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
> (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
> org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
>         at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>         at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>         at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
>         at
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>         at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
>         at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
>         at
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
>         at
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
>         at
> org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
>         at
> org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
>         at
> org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
>         at
> org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
>         at
> org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
>         at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>
> Is this a known issue with FlinkRunner? Is Session Windows with lateness
> @experimental in FlinkRunner?
>
> I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster -
> 1.5.3 and came across the same exception.
>
> I have also tried generating data with lateness as 0, and everything is
> working as expected. Seems like there is no problem in merging the windows
> of the records which belong to the same session.
>
> Thanks,
> Rahul
>

Re: NullPointerException - Session windows with Lateness in FlinkRunner

Posted by rahul patwari <ra...@gmail.com>.
+dev

On Wed 27 Mar, 2019, 9:47 PM rahul patwari, <ra...@gmail.com>
wrote:

> Hi,
> I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster -
> 1.7.2.
>
> I have this flow in my pipeline:
> KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
> gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default
> trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->
> Enrichment  -->  KafkaSink
>
> I am generating data in such a way that the first two records belong to
> two different sessions. And, generating the third record before the first
> session expires with the timestamp for the third record in such a way that
> the two sessions will be merged to become a single session.
>
> For Example, These are the sample input and output obtained when I ran the
> same pipeline in DirectRunner.
>
> Sample Input:
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
>
> Sample Output:
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
>
> Where "NumberOfRecords" is the count, "WST" is the Avro field Name which
> indicates the window start time for the session window. Similarly "WET"
> indicates the window End time of the session window. I am getting "WST" and
> "WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
>
> The program ran successfully in DirectRunner. But, in FlinkRunner, I am
> getting this exception when the third record arrives:
>
> 2019-03-27 15:31:00,442 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
> DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
> -> (Window.Into()/Window.Assign.out ->
> DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
> by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
> DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
> (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
> 2019-03-27 15:33:25,427 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
> -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) ->
> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
> DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
> ->
> DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
> (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
> org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
>         at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>         at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>         at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
>         at
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>         at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
>         at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
>         at
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
>         at
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
>         at
> org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
>         at
> org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
>         at
> org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
>         at
> org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
>         at
> org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
>         at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>
> Is this a known issue with FlinkRunner? Is Session Windows with lateness
> @experimental in FlinkRunner?
>
> I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster -
> 1.5.3 and came across the same exception.
>
> I have also tried generating data with lateness as 0, and everything is
> working as expected. Seems like there is no problem in merging the windows
> of the records which belong to the same session.
>
> Thanks,
> Rahul
>