You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Simone Cavallarin <ca...@hotmail.com> on 2020/12/13 12:11:46 UTC

How to Implement a simple boolean .trigger()

Hi All,

I'm trying to understand how to create a sample trigger. Let's say that I have a stream like this one:

Event: "YELLOW, BLUE, WHITE, RED, GREEN, RED, GREEN, RED, YELLOW, YELLOW"
Event: "YELLOW, BLUE, BLACK, RED,BLUE, RED, PINK, RED, YELLOW, YELLOW"

My stream is then mapped, and I produce an enriched Stream with an additional Boolean variable that define when is the moment to trigger the window. (I'm using this twostep approach to also calculate a value for a dynamic session window parameter, but on this example to simplify I cut it out, it would be more precise to say that I should use a Tuple3). In this specific example if two consecutive messages with 2 "yellow" are seen MyFunctionToAddTheBoolean() will emit a "true".

DataStream<Tuple2<Event, Boolean>> Enriched = stream
                .keyBy(...)
                .map(new MyFunctionToAddTheBoolean());

Enriched:Tuple2<stream_A, Boolean>

With this new stream called "Enriched", I'm going to move on to the second step where I would like to use the parameter to trigger the window processing.

DataStream<String> WinStream = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()))
                .trigger(MySuperTriggerFunction()_?)
                .process(new MyProcessWindowFunction());

My questions are:

1)Would be possible to have an example on how to write a function (MySuperTriggerFunction().)that can do this?
2)How the DynamicSessionWindows() and MySuperTriggerFunction() can work together, when on the DynamicSessionWindows() I'm giving to Flink an indication to process my data if the gap is greater than '1000' millis, but on the other hand I'm also giving a trigger(). Would the application be able to follow both and run a processWindowFunction if either of the two are respected, or do I have to decide which one of the two should be used? or prioritise?


I have been reading:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
Where i learn that: A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).

And:

https://gist.github.com/mxm/c5831ead9c9d9ad68731f5f2f3793154


But still... Some help would be really appreciated!

Thanks!

Simone