You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by KV 59 <kv...@gmail.com> on 2020/10/12 15:22:06 UTC

Count based triggers and latency

Hi All,

I'm building a pipeline to process events as they come and do not really
care about the event time and watermark. I'm more interested in not
discarding the events and reducing the latency. The downstream pipeline has
a stateful DoFn. I understand that the default window strategy is Global
Windows,. I did not completely understand the default trigger as per
https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.html
it says Repeatedly.forever(AfterWatermark.pastEndOfWindow()), In case of
global window how does this work (there is no end of window)?.

My source is Google PubSub and pipeline is running on Dataflow runner I
have defined my window transform as below

input.apply(TRANSFORM_NAME, Window.<T>into(new GlobalWindows())
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
> .withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())


A couple of questions

   1. Is triggering after each element inefficient in terms of
   persistence(serialization) after each element and also parallelism
   triggering after each looks like a serial execution?
   2. How does Dataflow parallelize in such cases of triggers?


Thanks and appreciate the responses.

Kishore

Re: Count based triggers and latency

Posted by Rui Wang <ru...@google.com>.
On Mon, Oct 12, 2020 at 9:23 PM KV 59 <kv...@gmail.com> wrote:

> Thanks for your responses.
>
> I have a follow-up question, when you say
>
>> elementCountAtLeast means that the runner can buffer as many as it wants
>> and can decide to offer a low latency pipeline by triggering often or
>> better throughput through the use of buffering.
>
>
> Does it mean, I as a pipeline developer cannot control how often the
> runner triggers?
>

(Please correct if I am wrong):

Yes, as I recall when trigger conditions meet, it allows the runner to fire
the trigger, but the runner can choose when to fire.



>
> Kishore
>
> On Mon, Oct 12, 2020 at 2:15 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Another thing to keep in mind - apologies if it was already clear:
>> triggering governs aggregation (GBK / Combine). It does not have any effect
>> on stateful DoFn.
>>
>> On Mon, Oct 12, 2020 at 9:24 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> The default trigger will only fire when the global window closes which
>>> does happen with sources whose watermark goes > GlobalWindow.MAX_TIMESTAMP
>>> or during pipeline drain with partial results in streaming. Bounded sources
>>> commonly have their watermark advance to the end of time when they complete
>>> and some unbounded sources can stop producing output if they detect the end.
>>>
>>> Parallelization for stateful DoFns are per key and window.
>>> Parallelization for GBK is per key and window pane. Note that
>>> elementCountAtLeast means that the runner can buffer as many as it wants
>>> and can decide to offer a low latency pipeline by triggering often or
>>> better throughput through the use of buffering.
>>>
>>>
>>>
>>> On Mon, Oct 12, 2020 at 8:22 AM KV 59 <kv...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I'm building a pipeline to process events as they come and do not
>>>> really care about the event time and watermark. I'm more interested in not
>>>> discarding the events and reducing the latency. The downstream pipeline has
>>>> a stateful DoFn. I understand that the default window strategy is Global
>>>> Windows,. I did not completely understand the default trigger as per
>>>>
>>>> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.html
>>>> it says Repeatedly.forever(AfterWatermark.pastEndOfWindow()), In case
>>>> of global window how does this work (there is no end of window)?.
>>>>
>>>> My source is Google PubSub and pipeline is running on Dataflow runner I
>>>> have defined my window transform as below
>>>>
>>>> input.apply(TRANSFORM_NAME, Window.<T>into(new GlobalWindows())
>>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>>>> .withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>>>>
>>>>
>>>> A couple of questions
>>>>
>>>>    1. Is triggering after each element inefficient in terms of
>>>>    persistence(serialization) after each element and also parallelism
>>>>    triggering after each looks like a serial execution?
>>>>    2. How does Dataflow parallelize in such cases of triggers?
>>>>
>>>>
>>>> Thanks and appreciate the responses.
>>>>
>>>> Kishore
>>>>
>>>

Re: Count based triggers and latency

Posted by KV 59 <kv...@gmail.com>.
Thanks for your responses.

I have a follow-up question, when you say

> elementCountAtLeast means that the runner can buffer as many as it wants
> and can decide to offer a low latency pipeline by triggering often or
> better throughput through the use of buffering.


Does it mean, I as a pipeline developer cannot control how often the runner
triggers?

Kishore

On Mon, Oct 12, 2020 at 2:15 PM Kenneth Knowles <ke...@apache.org> wrote:

> Another thing to keep in mind - apologies if it was already clear:
> triggering governs aggregation (GBK / Combine). It does not have any effect
> on stateful DoFn.
>
> On Mon, Oct 12, 2020 at 9:24 AM Luke Cwik <lc...@google.com> wrote:
>
>> The default trigger will only fire when the global window closes which
>> does happen with sources whose watermark goes > GlobalWindow.MAX_TIMESTAMP
>> or during pipeline drain with partial results in streaming. Bounded sources
>> commonly have their watermark advance to the end of time when they complete
>> and some unbounded sources can stop producing output if they detect the end.
>>
>> Parallelization for stateful DoFns are per key and window.
>> Parallelization for GBK is per key and window pane. Note that
>> elementCountAtLeast means that the runner can buffer as many as it wants
>> and can decide to offer a low latency pipeline by triggering often or
>> better throughput through the use of buffering.
>>
>>
>>
>> On Mon, Oct 12, 2020 at 8:22 AM KV 59 <kv...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I'm building a pipeline to process events as they come and do not really
>>> care about the event time and watermark. I'm more interested in not
>>> discarding the events and reducing the latency. The downstream pipeline has
>>> a stateful DoFn. I understand that the default window strategy is Global
>>> Windows,. I did not completely understand the default trigger as per
>>>
>>> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.html
>>> it says Repeatedly.forever(AfterWatermark.pastEndOfWindow()), In case
>>> of global window how does this work (there is no end of window)?.
>>>
>>> My source is Google PubSub and pipeline is running on Dataflow runner I
>>> have defined my window transform as below
>>>
>>> input.apply(TRANSFORM_NAME, Window.<T>into(new GlobalWindows())
>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>>> .withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>>>
>>>
>>> A couple of questions
>>>
>>>    1. Is triggering after each element inefficient in terms of
>>>    persistence(serialization) after each element and also parallelism
>>>    triggering after each looks like a serial execution?
>>>    2. How does Dataflow parallelize in such cases of triggers?
>>>
>>>
>>> Thanks and appreciate the responses.
>>>
>>> Kishore
>>>
>>

Re: Count based triggers and latency

Posted by Kenneth Knowles <ke...@apache.org>.
Another thing to keep in mind - apologies if it was already clear:
triggering governs aggregation (GBK / Combine). It does not have any effect
on stateful DoFn.

On Mon, Oct 12, 2020 at 9:24 AM Luke Cwik <lc...@google.com> wrote:

> The default trigger will only fire when the global window closes which
> does happen with sources whose watermark goes > GlobalWindow.MAX_TIMESTAMP
> or during pipeline drain with partial results in streaming. Bounded sources
> commonly have their watermark advance to the end of time when they complete
> and some unbounded sources can stop producing output if they detect the end.
>
> Parallelization for stateful DoFns are per key and window. Parallelization
> for GBK is per key and window pane. Note that  elementCountAtLeast means
> that the runner can buffer as many as it wants and can decide to offer a
> low latency pipeline by triggering often or better throughput through the
> use of buffering.
>
>
>
> On Mon, Oct 12, 2020 at 8:22 AM KV 59 <kv...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm building a pipeline to process events as they come and do not really
>> care about the event time and watermark. I'm more interested in not
>> discarding the events and reducing the latency. The downstream pipeline has
>> a stateful DoFn. I understand that the default window strategy is Global
>> Windows,. I did not completely understand the default trigger as per
>>
>> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.html
>> it says Repeatedly.forever(AfterWatermark.pastEndOfWindow()), In case of
>> global window how does this work (there is no end of window)?.
>>
>> My source is Google PubSub and pipeline is running on Dataflow runner I
>> have defined my window transform as below
>>
>> input.apply(TRANSFORM_NAME, Window.<T>into(new GlobalWindows())
>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>> .withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>>
>>
>> A couple of questions
>>
>>    1. Is triggering after each element inefficient in terms of
>>    persistence(serialization) after each element and also parallelism
>>    triggering after each looks like a serial execution?
>>    2. How does Dataflow parallelize in such cases of triggers?
>>
>>
>> Thanks and appreciate the responses.
>>
>> Kishore
>>
>

Re: Count based triggers and latency

Posted by Luke Cwik <lc...@google.com>.
The default trigger will only fire when the global window closes which does
happen with sources whose watermark goes > GlobalWindow.MAX_TIMESTAMP or
during pipeline drain with partial results in streaming. Bounded sources
commonly have their watermark advance to the end of time when they complete
and some unbounded sources can stop producing output if they detect the end.

Parallelization for stateful DoFns are per key and window. Parallelization
for GBK is per key and window pane. Note that  elementCountAtLeast means
that the runner can buffer as many as it wants and can decide to offer a
low latency pipeline by triggering often or better throughput through the
use of buffering.



On Mon, Oct 12, 2020 at 8:22 AM KV 59 <kv...@gmail.com> wrote:

> Hi All,
>
> I'm building a pipeline to process events as they come and do not really
> care about the event time and watermark. I'm more interested in not
> discarding the events and reducing the latency. The downstream pipeline has
> a stateful DoFn. I understand that the default window strategy is Global
> Windows,. I did not completely understand the default trigger as per
>
> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.html
> it says Repeatedly.forever(AfterWatermark.pastEndOfWindow()), In case of
> global window how does this work (there is no end of window)?.
>
> My source is Google PubSub and pipeline is running on Dataflow runner I
> have defined my window transform as below
>
> input.apply(TRANSFORM_NAME, Window.<T>into(new GlobalWindows())
>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>> .withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>
>
> A couple of questions
>
>    1. Is triggering after each element inefficient in terms of
>    persistence(serialization) after each element and also parallelism
>    triggering after each looks like a serial execution?
>    2. How does Dataflow parallelize in such cases of triggers?
>
>
> Thanks and appreciate the responses.
>
> Kishore
>