You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Eleanore Jin <el...@gmail.com> on 2021/10/17 21:31:12 UTC

Define custom trigger

Hi community,

Here is my usecase:
My pipeline uses another kafka topic as a SideInputs, that contains the
filter criteria. Then when processing the mainstream, the pipeline is
trying to see if each message from mainstream matches *any *existing filter
criteria.

The sideInputs logic is: whenever seeing at least 1 element from sideInputs
topic -> fire -> and accumulate all the elements seen

Trigger trigger = Repeatedly.forever(
  AfterFirst.of(
    AfterPane.elementCountAtLeast(1),
    AfterProcessingTime.pastFirstElementInPane()
  ));

return kafkaValues.apply("sideInputFromTopic-" + topicName,

Window.<KV<String, T>>into(new GlobalWindows())
  .triggering(trigger)
  .accumulatingFiredPanes()
  .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
);


Here is my problem:
During application restarts, the 1st message from mainstream matches
the 3rd element in the sideInputs.
but since the pane will fire when the 1st element from sideInputs is
consumed, so it marked the 1st message
from mainstream as not pass filter.

Then I switched to the below trigger, which fires after 5 seconds
after reading 1 element from sideInputs.
This could workaround the application restarts problem, but for any
subsequent published elements in the sideInputs,
it also requires waiting for 5 seconds to fire, which could lead to
some messages from mainstream incorrectly being marked as not pass
filters.

Trigger trigger = Repeatedly.forever(
      AfterFirst.of(
        AfterPane.elementCountAtLeast(3000),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))
      ));

Question:

I want to define a custom trigger that 1st time fires, it will wait
for say 5 - 10 seconds before processing the mainstream.

But afterwards, it fires as soon as it sees a new element from sideInputs.

Is this possible? Or should I be able to leverage existing API to do it?


Beam version: 2.23, Flink runner: 1.10.2

Thanks a lot!

Eleanore

Re: Define custom trigger

Posted by Luke Cwik <lc...@google.com>.
Forgot a key "return"

processElement(...) {
  if (value state is false) {
    if (bag state is empty) {
      schedule processing timer for 5 seconds from now
    }
    buffer element in bag state
*    return*
  }
  output element
}

onTimer(...) {
  output everything in bag state buffer
  set value state to true
}

On Mon, Oct 18, 2021 at 5:13 PM Eleanore Jin <el...@gmail.com> wrote:

> Thanks a lot Luke, I will try it out!
>
> Eleanore
>
> On Mon, Oct 18, 2021 at 9:41 AM Luke Cwik <lc...@google.com> wrote:
>
>> You could use a stateful DoFn and buffer the first message and everything
>> that you see for the first 5 seconds and then afterwards pass everything
>> through. Something like:
>>
>> processElement(...) {
>>   if (value state is false) {
>>     if (bag state is empty) {
>>       schedule processing timer for 5 seconds from now
>>     }
>>     buffer element in bag state
>>   }
>>   output element
>> }
>>
>> onTimer(...) {
>>   output everything in bag state buffer
>>   set value state to true
>> }
>>
>>
>>
>> On Sun, Oct 17, 2021 at 2:31 PM Eleanore Jin <el...@gmail.com>
>> wrote:
>>
>>> Hi community,
>>>
>>> Here is my usecase:
>>> My pipeline uses another kafka topic as a SideInputs, that contains the
>>> filter criteria. Then when processing the mainstream, the pipeline is
>>> trying to see if each message from mainstream matches *any *existing
>>> filter criteria.
>>>
>>> The sideInputs logic is: whenever seeing at least 1 element from
>>> sideInputs topic -> fire -> and accumulate all the elements seen
>>>
>>> Trigger trigger = Repeatedly.forever(
>>>   AfterFirst.of(
>>>     AfterPane.elementCountAtLeast(1),
>>>     AfterProcessingTime.pastFirstElementInPane()
>>>   ));
>>>
>>> return kafkaValues.apply("sideInputFromTopic-" + topicName,
>>>
>>> Window.<KV<String, T>>into(new GlobalWindows())
>>>   .triggering(trigger)
>>>   .accumulatingFiredPanes()
>>>   .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
>>> );
>>>
>>>
>>> Here is my problem:
>>> During application restarts, the 1st message from mainstream matches the 3rd element in the sideInputs.
>>> but since the pane will fire when the 1st element from sideInputs is consumed, so it marked the 1st message
>>> from mainstream as not pass filter.
>>>
>>> Then I switched to the below trigger, which fires after 5 seconds after reading 1 element from sideInputs.
>>> This could workaround the application restarts problem, but for any subsequent published elements in the sideInputs,
>>> it also requires waiting for 5 seconds to fire, which could lead to some messages from mainstream incorrectly being marked as not pass filters.
>>>
>>> Trigger trigger = Repeatedly.forever(
>>>       AfterFirst.of(
>>>         AfterPane.elementCountAtLeast(3000),
>>>         AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))
>>>       ));
>>>
>>> Question:
>>>
>>> I want to define a custom trigger that 1st time fires, it will wait for say 5 - 10 seconds before processing the mainstream.
>>>
>>> But afterwards, it fires as soon as it sees a new element from sideInputs.
>>>
>>> Is this possible? Or should I be able to leverage existing API to do it?
>>>
>>>
>>> Beam version: 2.23, Flink runner: 1.10.2
>>>
>>> Thanks a lot!
>>>
>>> Eleanore
>>>
>>>
>>>

Re: Define custom trigger

Posted by Eleanore Jin <el...@gmail.com>.
Thanks a lot Luke, I will try it out!

Eleanore

On Mon, Oct 18, 2021 at 9:41 AM Luke Cwik <lc...@google.com> wrote:

> You could use a stateful DoFn and buffer the first message and everything
> that you see for the first 5 seconds and then afterwards pass everything
> through. Something like:
>
> processElement(...) {
>   if (value state is false) {
>     if (bag state is empty) {
>       schedule processing timer for 5 seconds from now
>     }
>     buffer element in bag state
>   }
>   output element
> }
>
> onTimer(...) {
>   output everything in bag state buffer
>   set value state to true
> }
>
>
>
> On Sun, Oct 17, 2021 at 2:31 PM Eleanore Jin <el...@gmail.com>
> wrote:
>
>> Hi community,
>>
>> Here is my usecase:
>> My pipeline uses another kafka topic as a SideInputs, that contains the
>> filter criteria. Then when processing the mainstream, the pipeline is
>> trying to see if each message from mainstream matches *any *existing
>> filter criteria.
>>
>> The sideInputs logic is: whenever seeing at least 1 element from
>> sideInputs topic -> fire -> and accumulate all the elements seen
>>
>> Trigger trigger = Repeatedly.forever(
>>   AfterFirst.of(
>>     AfterPane.elementCountAtLeast(1),
>>     AfterProcessingTime.pastFirstElementInPane()
>>   ));
>>
>> return kafkaValues.apply("sideInputFromTopic-" + topicName,
>>
>> Window.<KV<String, T>>into(new GlobalWindows())
>>   .triggering(trigger)
>>   .accumulatingFiredPanes()
>>   .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
>> );
>>
>>
>> Here is my problem:
>> During application restarts, the 1st message from mainstream matches the 3rd element in the sideInputs.
>> but since the pane will fire when the 1st element from sideInputs is consumed, so it marked the 1st message
>> from mainstream as not pass filter.
>>
>> Then I switched to the below trigger, which fires after 5 seconds after reading 1 element from sideInputs.
>> This could workaround the application restarts problem, but for any subsequent published elements in the sideInputs,
>> it also requires waiting for 5 seconds to fire, which could lead to some messages from mainstream incorrectly being marked as not pass filters.
>>
>> Trigger trigger = Repeatedly.forever(
>>       AfterFirst.of(
>>         AfterPane.elementCountAtLeast(3000),
>>         AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))
>>       ));
>>
>> Question:
>>
>> I want to define a custom trigger that 1st time fires, it will wait for say 5 - 10 seconds before processing the mainstream.
>>
>> But afterwards, it fires as soon as it sees a new element from sideInputs.
>>
>> Is this possible? Or should I be able to leverage existing API to do it?
>>
>>
>> Beam version: 2.23, Flink runner: 1.10.2
>>
>> Thanks a lot!
>>
>> Eleanore
>>
>>
>>

Re: Define custom trigger

Posted by Luke Cwik <lc...@google.com>.
You could use a stateful DoFn and buffer the first message and everything
that you see for the first 5 seconds and then afterwards pass everything
through. Something like:

processElement(...) {
  if (value state is false) {
    if (bag state is empty) {
      schedule processing timer for 5 seconds from now
    }
    buffer element in bag state
  }
  output element
}

onTimer(...) {
  output everything in bag state buffer
  set value state to true
}



On Sun, Oct 17, 2021 at 2:31 PM Eleanore Jin <el...@gmail.com> wrote:

> Hi community,
>
> Here is my usecase:
> My pipeline uses another kafka topic as a SideInputs, that contains the
> filter criteria. Then when processing the mainstream, the pipeline is
> trying to see if each message from mainstream matches *any *existing
> filter criteria.
>
> The sideInputs logic is: whenever seeing at least 1 element from
> sideInputs topic -> fire -> and accumulate all the elements seen
>
> Trigger trigger = Repeatedly.forever(
>   AfterFirst.of(
>     AfterPane.elementCountAtLeast(1),
>     AfterProcessingTime.pastFirstElementInPane()
>   ));
>
> return kafkaValues.apply("sideInputFromTopic-" + topicName,
>
> Window.<KV<String, T>>into(new GlobalWindows())
>   .triggering(trigger)
>   .accumulatingFiredPanes()
>   .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
> );
>
>
> Here is my problem:
> During application restarts, the 1st message from mainstream matches the 3rd element in the sideInputs.
> but since the pane will fire when the 1st element from sideInputs is consumed, so it marked the 1st message
> from mainstream as not pass filter.
>
> Then I switched to the below trigger, which fires after 5 seconds after reading 1 element from sideInputs.
> This could workaround the application restarts problem, but for any subsequent published elements in the sideInputs,
> it also requires waiting for 5 seconds to fire, which could lead to some messages from mainstream incorrectly being marked as not pass filters.
>
> Trigger trigger = Repeatedly.forever(
>       AfterFirst.of(
>         AfterPane.elementCountAtLeast(3000),
>         AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))
>       ));
>
> Question:
>
> I want to define a custom trigger that 1st time fires, it will wait for say 5 - 10 seconds before processing the mainstream.
>
> But afterwards, it fires as soon as it sees a new element from sideInputs.
>
> Is this possible? Or should I be able to leverage existing API to do it?
>
>
> Beam version: 2.23, Flink runner: 1.10.2
>
> Thanks a lot!
>
> Eleanore
>
>
>