You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2021/09/10 15:46:07 UTC

Does python DirectRunner correctly set watermark hold for timers?

Hi,

I didn't dig into the code yet, just wanted to ask if someone could give 
me some pointers. I'm experiencing weird behavior in python SDK, when 
emitting data from stateful DoFn with event-time timer set on 
GlobalWindow().max_timestamp(). The data is correctly emitted, but is 
probably delayed after watermark and is dropped on first GBK downstream 
(assert_that in my case), the assertion then fails. When I use 
FlinkRunner it seems to work as expected (the test passes).

Did anyone see something like this before?

Thanks,

  Jan


Re: Does python DirectRunner correctly set watermark hold for timers?

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Pablo,

I'm not using TestStream, the test uses Create, but changing the input 
to TestStream.add_elements(...).advance_watermark_to_infinity() seems 
not to have any impact on the outcome. The outputs are still produced 
and then dropped. The pipeline logic is pretty much simple - buffer data 
for defined amount of processing time then flush it. The event-time 
timer is to enforce flushing of the buffer at the end of global window 
(python seems to be missing @OnWindowExpiration as of now).

Regarding the runner, I'm using the default which is created using (beam 
2.32.0)

    beam.Pipeline(options=PipelineOptions(["--streaming"]))

With this the test seems to be 100% failing. Changing the above to

    beam.Pipeline(options=PipelineOptions(["--streaming", 
"--runner=flink"]))

the test seems to be 100% passing.

  Jan

On 9/13/21 7:58 PM, Pablo Estrada wrote:
> Hi Jan,
> are you using a TestStream? Do you know which 
> directrunner implementation is being used? (FnApiRunner or 
> BundleBasedDirectRunner?)
> -P.
>
> On Mon, Sep 13, 2021 at 10:45 AM Ahmet Altay <altay@google.com 
> <ma...@google.com>> wrote:
>
>     +Pablo Estrada <ma...@google.com> might know the answer.
>
>     On Fri, Sep 10, 2021 at 8:46 AM Jan Lukavský <je.ik@seznam.cz
>     <ma...@seznam.cz>> wrote:
>
>         Hi,
>
>         I didn't dig into the code yet, just wanted to ask if someone
>         could give
>         me some pointers. I'm experiencing weird behavior in python
>         SDK, when
>         emitting data from stateful DoFn with event-time timer set on
>         GlobalWindow().max_timestamp(). The data is correctly emitted,
>         but is
>         probably delayed after watermark and is dropped on first GBK
>         downstream
>         (assert_that in my case), the assertion then fails. When I use
>         FlinkRunner it seems to work as expected (the test passes).
>
>         Did anyone see something like this before?
>
>         Thanks,
>
>           Jan
>

Re: Does python DirectRunner correctly set watermark hold for timers?

Posted by Pablo Estrada <pa...@google.com>.
Hi Jan,
are you using a TestStream? Do you know which directrunner implementation
is being used? (FnApiRunner or BundleBasedDirectRunner?)
-P.

On Mon, Sep 13, 2021 at 10:45 AM Ahmet Altay <al...@google.com> wrote:

> +Pablo Estrada <pa...@google.com> might know the answer.
>
> On Fri, Sep 10, 2021 at 8:46 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> I didn't dig into the code yet, just wanted to ask if someone could give
>> me some pointers. I'm experiencing weird behavior in python SDK, when
>> emitting data from stateful DoFn with event-time timer set on
>> GlobalWindow().max_timestamp(). The data is correctly emitted, but is
>> probably delayed after watermark and is dropped on first GBK downstream
>> (assert_that in my case), the assertion then fails. When I use
>> FlinkRunner it seems to work as expected (the test passes).
>>
>> Did anyone see something like this before?
>>
>> Thanks,
>>
>>   Jan
>>
>>

Re: Does python DirectRunner correctly set watermark hold for timers?

Posted by Ahmet Altay <al...@google.com>.
+Pablo Estrada <pa...@google.com> might know the answer.

On Fri, Sep 10, 2021 at 8:46 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I didn't dig into the code yet, just wanted to ask if someone could give
> me some pointers. I'm experiencing weird behavior in python SDK, when
> emitting data from stateful DoFn with event-time timer set on
> GlobalWindow().max_timestamp(). The data is correctly emitted, but is
> probably delayed after watermark and is dropped on first GBK downstream
> (assert_that in my case), the assertion then fails. When I use
> FlinkRunner it seems to work as expected (the test passes).
>
> Did anyone see something like this before?
>
> Thanks,
>
>   Jan
>
>