You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Aljoscha Krettek <al...@apache.org> on 2020/11/11 11:21:11 UTC

Re: Empty output when job stops

I assigned the issue to myself and whipped up a quick test case to check 
what our expectations for this are: 
https://github.com/aljoscha/beam/commit/cd0cea6c740846c0eb79091b0e7862487facf07b

Is this the behaviour we should go for? What's interesting is that all 
state backends create an empty accumulator when you try and read state 
for a key that didn't exist yet. Is this even the expected behaviour? 
The first uncommented line would test for that.

As is, all StateInternals implementations that use this test base fail 
that test.

On 19.10.20 18:17, Luke Cwik wrote:
> Aljoscha, I think having this fixed is more important than maintaining the
> update support since it leads to incorrect results.
> 
> On Fri, Oct 16, 2020 at 2:06 AM Andrés Garagiola <an...@gmail.com>
> wrote:
> 
>> Hi Aljoscha,
>>
>> I printed the stack trace in the createAccumulator() method of my combinar:
>>
>> taskmanager_1  | java.lang.Thread.getStackTrace(Thread.java:1559)
>> taskmanager_1  |
>> com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:29)
>> taskmanager_1  |
>> com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:20)
>> taskmanager_1  |
>> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.read(FlinkStateInternals.java:510)
>> taskmanager_1  |
>> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127)
>> taskmanager_1  |
>> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1059)
>> taskmanager_1  |
>> org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
>> taskmanager_1  |
>> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
>> taskmanager_1  |
>> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> taskmanager_1  |
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>> taskmanager_1  |
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>> taskmanager_1  |
>> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>> taskmanager_1  |
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>> taskmanager_1  |
>> org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:128)
>> taskmanager_1  |
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:924)
>> taskmanager_1  |
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:913)
>> taskmanager_1  |
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
>> taskmanager_1  |
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>> taskmanager_1  |
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:702)
>> taskmanager_1  |
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:681)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>> taskmanager_1  |
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>> taskmanager_1  |
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> taskmanager_1  |
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> taskmanager_1  | java.lang.Thread.run(Thread.java:748)
>>
>> Let me know if there is something else in which I can help.
>>
>> Regards
>>
>> On Thu, Oct 15, 2020 at 8:34 PM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> There's multiple things that come together here, I'm afraid:
>>>
>>> 1. There is additional output when stopping with a savepoint. It would
>>> be good to know where that comes from.
>>>
>>> 2. The state internals implementation does in fact seem wrong. We don't
>>> differentiate the cases of "never created an accumulator" and "my
>>> accumulator is null".
>>>
>>> @Andrés, could you put breakpoints in your Combiner implementation and
>>> see when that second output happens and why it happens (a stacktrace
>>> would help, probably)
>>>
>>> Regarding the state internals: we would basically need to introduce one
>>> more layer, instead of keeping an AccumT we need to keep an
>>> Option<AccumT> or something of that sort. Nnot saying Java Optional
>>> here, on purpose. However, changing the state type would have the
>>> consequence that savepoints are no longer compatible, i.e. you cannot
>>> restore a job from before this change using a Beam version after this
>>> change. So I'm very reluctant.
>>>
>>>
>>> On 15.10.20 11:51, Andrés Garagiola wrote:
>>>> Thanks Luke, Aljoscha
>>>>
>>>> Let me know if I can help you to reproduce the problem.
>>>> In my case the state is never set to null but I think that it becomes
>>> null
>>>> while the job is stopping. Once I run the job again from the savepoint,
>>> the
>>>> state is recovered normally.
>>>>
>>>> Let's show this with an example:
>>>>
>>>> t0: Add input 1 => accu state [1] => output [1]
>>>> t1: Add input 2 => acu state [1,2] => output [1,2]
>>>> t2: stop job with savepoint => output [1,2,3] and *output [] *
>>>> t3: run job from savepoint => acu state [1,2] => no output
>>>> t4: Add input 3 => acu state [1,2,3] => [1,2,3]
>>>>
>>>> Regards
>>>>
>>>> On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> I'll take a look.
>>>>>
>>>>> On 14.10.20 18:49, Luke Cwik wrote:
>>>>>> Assuming that null means that the accumulator was never created is not
>>>>>> right especially if null is a valid terminal state while the
>>>>>> initial accumulator value is non-null. This is uncommon but possible.
>>>>> Filed
>>>>>> https://issues.apache.org/jira/browse/BEAM-11063.
>>>>>>
>>>>>> +Aljoscha Krettek <al...@apache.org> Is this something you can
>>> take a
>>>>>> look at?
>>>>>>
>>>>>> On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
>>>>> andresgaragiola@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I have this problem in a stream pipeline using the runner Apache
>>> Flink
>>>>>>> 1.19. I want to do an upgrade to my job. I first end the job by using
>>>>> the
>>>>>>> Flink API creating a savepoint, and then I start the new version by
>>>>> using
>>>>>>> the Flink API passing the savepoint path.
>>>>>>>
>>>>>>> When the job ends two new records are created. The first one is OK
>>> but
>>>>> the
>>>>>>> second one is an empty record.
>>>>>>>
>>>>>>>
>>>>>>> My pipeline uses this window strategy:
>>>>>>>
>>>>>>>
>>>>>>> *Window<KV<String, TaggedEvent>> window =*
>>>>>>>
>>>>>>> *    Window.<KV<String,
>>>>>>>
>>> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
>>>>>>>
>>>>>>> *        .triggering(AfterWatermark.pastEndOfWindow()*
>>>>>>>
>>>>>>> *
>>>>>>>
>>>>>
>>>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
>>>>>>>
>>>>>>> *
>>>>>>>     .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
>>>>>>>
>>>>>>> *
>>>>>>>
>>>>>
>>> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
>>>>>>>
>>>>>>> *        .accumulatingFiredPanes();*
>>>>>>>
>>>>>>>
>>>>>>> I implemented a custom combiner, and I realized that the state of the
>>>>>>> combiner is null in the second output. This line (
>>>>>>>
>>>>>
>>> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507
>>>>> )
>>>>>>> is evaluated to false, and then it creates an empty accumulator.
>>>>>>>
>>>>>>>
>>>>>>> Is this the expected behavior?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
> 


Re: Empty output when job stops

Posted by Aljoscha Krettek <al...@apache.org>.
Btw, I think we won't be done fixing the original problem with just 
this. I have a suspicion that we also would need to touch the code that 
checks whether to emit a result for a pane or not. Or maybe fixing the 
code that checks for empty panes will already be sufficient.

Also, please check out my branch instead of just the commit I posted 
earlier because I pushed another commit: 
https://github.com/aljoscha/beam/tree/beam-11063-null-value-combinefn

On 11.11.20 12:21, Aljoscha Krettek wrote:
> I assigned the issue to myself and whipped up a quick test case to check 
> what our expectations for this are: 
> https://github.com/aljoscha/beam/commit/cd0cea6c740846c0eb79091b0e7862487facf07b 
> 
> 
> Is this the behaviour we should go for? What's interesting is that all 
> state backends create an empty accumulator when you try and read state 
> for a key that didn't exist yet. Is this even the expected behaviour? 
> The first uncommented line would test for that.
> 
> As is, all StateInternals implementations that use this test base fail 
> that test.
> 
> On 19.10.20 18:17, Luke Cwik wrote:
>> Aljoscha, I think having this fixed is more important than maintaining 
>> the
>> update support since it leads to incorrect results.
>>
>> On Fri, Oct 16, 2020 at 2:06 AM Andrés Garagiola 
>> <an...@gmail.com>
>> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> I printed the stack trace in the createAccumulator() method of my 
>>> combinar:
>>>
>>> taskmanager_1  | java.lang.Thread.getStackTrace(Thread.java:1559)
>>> taskmanager_1  |
>>> com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:29) 
>>>
>>> taskmanager_1  |
>>> com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:20) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.read(FlinkStateInternals.java:510) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1059) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>> taskmanager_1  |
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:128) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:924) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:913) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:702) 
>>>
>>> taskmanager_1  |
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:681) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) 
>>>
>>> taskmanager_1  |
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> taskmanager_1  |
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> taskmanager_1  | java.lang.Thread.run(Thread.java:748)
>>>
>>> Let me know if there is something else in which I can help.
>>>
>>> Regards
>>>
>>> On Thu, Oct 15, 2020 at 8:34 PM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> There's multiple things that come together here, I'm afraid:
>>>>
>>>> 1. There is additional output when stopping with a savepoint. It would
>>>> be good to know where that comes from.
>>>>
>>>> 2. The state internals implementation does in fact seem wrong. We don't
>>>> differentiate the cases of "never created an accumulator" and "my
>>>> accumulator is null".
>>>>
>>>> @Andrés, could you put breakpoints in your Combiner implementation and
>>>> see when that second output happens and why it happens (a stacktrace
>>>> would help, probably)
>>>>
>>>> Regarding the state internals: we would basically need to introduce one
>>>> more layer, instead of keeping an AccumT we need to keep an
>>>> Option<AccumT> or something of that sort. Nnot saying Java Optional
>>>> here, on purpose. However, changing the state type would have the
>>>> consequence that savepoints are no longer compatible, i.e. you cannot
>>>> restore a job from before this change using a Beam version after this
>>>> change. So I'm very reluctant.
>>>>
>>>>
>>>> On 15.10.20 11:51, Andrés Garagiola wrote:
>>>>> Thanks Luke, Aljoscha
>>>>>
>>>>> Let me know if I can help you to reproduce the problem.
>>>>> In my case the state is never set to null but I think that it becomes
>>>> null
>>>>> while the job is stopping. Once I run the job again from the 
>>>>> savepoint,
>>>> the
>>>>> state is recovered normally.
>>>>>
>>>>> Let's show this with an example:
>>>>>
>>>>> t0: Add input 1 => accu state [1] => output [1]
>>>>> t1: Add input 2 => acu state [1,2] => output [1,2]
>>>>> t2: stop job with savepoint => output [1,2,3] and *output [] *
>>>>> t3: run job from savepoint => acu state [1,2] => no output
>>>>> t4: Add input 3 => acu state [1,2,3] => [1,2,3]
>>>>>
>>>>> Regards
>>>>>
>>>>> On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek 
>>>>> <al...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I'll take a look.
>>>>>>
>>>>>> On 14.10.20 18:49, Luke Cwik wrote:
>>>>>>> Assuming that null means that the accumulator was never created 
>>>>>>> is not
>>>>>>> right especially if null is a valid terminal state while the
>>>>>>> initial accumulator value is non-null. This is uncommon but 
>>>>>>> possible.
>>>>>> Filed
>>>>>>> https://issues.apache.org/jira/browse/BEAM-11063.
>>>>>>>
>>>>>>> +Aljoscha Krettek <al...@apache.org> Is this something you can
>>>> take a
>>>>>>> look at?
>>>>>>>
>>>>>>> On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
>>>>>> andresgaragiola@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I have this problem in a stream pipeline using the runner Apache
>>>> Flink
>>>>>>>> 1.19. I want to do an upgrade to my job. I first end the job by 
>>>>>>>> using
>>>>>> the
>>>>>>>> Flink API creating a savepoint, and then I start the new version by
>>>>>> using
>>>>>>>> the Flink API passing the savepoint path.
>>>>>>>>
>>>>>>>> When the job ends two new records are created. The first one is OK
>>>> but
>>>>>> the
>>>>>>>> second one is an empty record.
>>>>>>>>
>>>>>>>>
>>>>>>>> My pipeline uses this window strategy:
>>>>>>>>
>>>>>>>>
>>>>>>>> *Window<KV<String, TaggedEvent>> window =*
>>>>>>>>
>>>>>>>> *    Window.<KV<String,
>>>>>>>>
>>>> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
>>>>>>>>
>>>>>>>> *        .triggering(AfterWatermark.pastEndOfWindow()*
>>>>>>>>
>>>>>>>> *
>>>>>>>>
>>>>>>
>>>>   
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))* 
>>>>
>>>>>>>>
>>>>>>>> *
>>>>>>>>     
>>>>>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
>>>>>>>>
>>>>>>>> *
>>>>>>>>
>>>>>>
>>>> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))* 
>>>>
>>>>>>>>
>>>>>>>> *        .accumulatingFiredPanes();*
>>>>>>>>
>>>>>>>>
>>>>>>>> I implemented a custom combiner, and I realized that the state 
>>>>>>>> of the
>>>>>>>> combiner is null in the second output. This line (
>>>>>>>>
>>>>>>
>>>> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507 
>>>>
>>>>>> )
>>>>>>>> is evaluated to false, and then it creates an empty accumulator.
>>>>>>>>
>>>>>>>>
>>>>>>>> Is this the expected behavior?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>
>