You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dominik Bruhn <do...@dbruhn.de> on 2016/10/04 12:54:30 UTC

TimeWindow Trigger which only fires when the values have changed

Hi,
I'm heavily relying on TimeWindows for my real time processing. Roughly 
my job consumes from an AMQP queue, computes some time buckets and saves 
the time-buckets to Cassandra.

I found the EventTimeTriggerWithEarlyAndLateFiring [1] class which 
already helped me a lot: Even with long time-windows, I can get 
intermediate values already saved to Cassandra by using the earlyFiring 
(and setting "accumulating" to true.

My question is: Would it be possible to only write fire the trigger if 
the value of the TimeBucket has changed? What I actually want is only 
writing to Cassandra if there is actually something different in the 
time bucket.

And, as a side question: Why is something like the 
EventTimeTriggerWithEarlyAndLateFiring not in the default Flink 
distribution? It seems very handy.

[1]: 
https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java


Thanks,
Dominik

Re: TimeWindow Trigger which only fires when the values have changed

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Dominik,

To only fire when new elements have arrived, you should modify your EventTimeTriggerWithEarlyAndLateFiring to detect that
more elements have arrived since the last firing.

To do so, you should add some extra of state, e.g. ValueStateDescriptor<Boolean>, that you set to true in the onElement(), and you reset back to false whenever a method (any of the on* methods) returns TriggerResult.FIRE. Given this change, the decision to fire or not should be based on the same criteria as before, plus that the newly introduced state is true (meaning you have new data since the last time you fired).

Now for making a trigger with early and late firings part of the main Flink distribution, this is already close to being part of the 
master branch, as there is already an open pull request. For more you can follow the discussion on the TriggerDSL in the dev 
mailing list and here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-9%3A+Trigger+DSL <https://cwiki.apache.org/confluence/display/FLINK/FLIP-9:+Trigger+DSL>

Thanks,
Kostas

> On Oct 4, 2016, at 2:54 PM, Dominik Bruhn <do...@dbruhn.de> wrote:
> 
> Hi,
> I'm heavily relying on TimeWindows for my real time processing. Roughly my job consumes from an AMQP queue, computes some time buckets and saves the time-buckets to Cassandra.
> 
> I found the EventTimeTriggerWithEarlyAndLateFiring [1] class which already helped me a lot: Even with long time-windows, I can get intermediate values already saved to Cassandra by using the earlyFiring (and setting "accumulating" to true.
> 
> My question is: Would it be possible to only write fire the trigger if the value of the TimeBucket has changed? What I actually want is only writing to Cassandra if there is actually something different in the time bucket.
> 
> And, as a side question: Why is something like the EventTimeTriggerWithEarlyAndLateFiring not in the default Flink distribution? It seems very handy.
> 
> [1]: https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
> 
> 
> Thanks,
> Dominik


Re: TimeWindow Trigger which only fires when the values have changed

Posted by Till Rohrmann <tr...@apache.org>.
Hi Dominik,

you could extend the EventTimeTriggerWithEarlyAndLateFiring trigger to
store for each key whether you’ve seen a new element since the last firing
or not. When firing you can set the state back to alreadyFired. For that
you can use the TriggerContext.getPartitionedState.

The community is currently on reworking Flink’s triggers to provide a
trigger DSL [1]. With this DSL you’ll be able to specify a trigger as
you’ve found with EventTimeTriggerWithEarlyAndLateFiring.

Cheers,
Till
​

On Tue, Oct 4, 2016 at 2:54 PM, Dominik Bruhn <do...@dbruhn.de> wrote:

> Hi,
> I'm heavily relying on TimeWindows for my real time processing. Roughly my
> job consumes from an AMQP queue, computes some time buckets and saves the
> time-buckets to Cassandra.
>
> I found the EventTimeTriggerWithEarlyAndLateFiring [1] class which
> already helped me a lot: Even with long time-windows, I can get
> intermediate values already saved to Cassandra by using the earlyFiring
> (and setting "accumulating" to true.
>
> My question is: Would it be possible to only write fire the trigger if the
> value of the TimeBucket has changed? What I actually want is only writing
> to Cassandra if there is actually something different in the time bucket.
>
> And, as a side question: Why is something like the
> EventTimeTriggerWithEarlyAndLateFiring not in the default Flink
> distribution? It seems very handy.
>
> [1]: https://github.com/dataArtisans/beam_comp/blob/master/src/
> main/java/com/dataartisans/beam_comparison/customTriggers
> /EventTimeTriggerWithEarlyAndLateFiring.java
>
>
> Thanks,
> Dominik
>