You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Harshvardhan Agrawal <ha...@gmail.com> on 2018/07/22 15:59:47 UTC

Behaviour of triggers in Flink

Hi,

I have been trying to understand how triggers work in Flink. We have a set
of data that arrives to us on Kafka. We need to process the data in a
window when either one of the two criteria satisfy:
1) Max number of elements has reached in the window.
2) Some max time has elapsed (Say 5 milliseconds in our case).

I have written the following code:

public class WindowTest {
    public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DataStreamSource<Long> source = env.addSource(new
SourceFunction<Long>() {

            @Override
            public void run(SourceContext<Long> ctx) throws Exception {
                LongStream.range(0, 102).forEach(ctx::collect);
            }

            @Override
            public void cancel() {

            }
        });


source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new
AllWindowFunction<Long, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Long> values,
Collector<Object> collector) throws Exception {
                System.out.println("processing a window");
                System.out.println(Joiner.on(',').join(values));
            }
        }).print();

        env.execute("test-program");

    }
}

Here is the output I get when I run this code:

processing a window
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
processing a window
15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
processing a window
30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
processing a window
60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
processing a window
75,76,77,78,79,80,81,82,83,84,85,86,87,88,89

As you can see, the data from 90 to 101 is not processed. Shouldn't it be
processed when the window is completed after 5 ms?

When I remove the trigger part, all of the data does get processed from 0
to 101.

Any idea why do we see such a behaviour here?
-- 

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*

Re: Behaviour of triggers in Flink

Posted by Harshvardhan Agrawal <ha...@gmail.com>.
Thanks for the response Hequn. I also see a weird behavior with purging
trigger. It skips messages.

Here is the repro:

public class WindowTest {
    public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DataStreamSource<Long> source = env.addSource(new
SourceFunction<Long>() {

            @Override
            public void run(SourceContext<Long> ctx) throws Exception {
                LongStream.range(0, 101).forEach(ctx::collect);
            }

            @Override
            public void cancel() {

            }
        });

        source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(7))).apply(new
AllWindowFunction<Long, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Long>
values, Collector<Object> collector) throws Exception {
                System.out.println("processing a window");
                System.out.println(Joiner.on(',').join(values));
            }
        }).print();

        env.execute("test-program");

    }
}



processing a window
0,1,2,3,4,5,6
processing a window
10,11,12,13,14,15,16
processing a window
17,18,19,20,21,22,23
processing a window
24,25,26,27,28,29,30
processing a window
31,32,33,34,35,36,37
processing a window
38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51
processing a window
52,53,54,55,56,57,58
processing a window
59,60,61,62,63,64,65
processing a window
66,67,68,69,70,71,72
processing a window
73,74,75,76,77,78,79
processing a window
80,81,82,83,84,85,86
processing a window
87,88,89,90,91,92,93
processing a window
94,95,96,97,98,99,100

It has skipped numbers 7-9. Is this expected behavior?

On Sun, Jul 22, 2018 at 9:43 PM Hequn Cheng <ch...@gmail.com> wrote:

> Hi Harshvardhan,
>
> By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner. For example, if you specify a CountTrigger for
> TumblingEventTimeWindows you will no longer get window firings based on the
> progress of time but only by count. Right now, you have to write your own
> custom trigger if you want to react based on both time and count.
> More details here[1].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#default-triggers-of-windowassigners
>
> On Sun, Jul 22, 2018 at 11:59 PM, Harshvardhan Agrawal <
> harshvardhan.agr93@gmail.com> wrote:
>
>> Hi,
>>
>> I have been trying to understand how triggers work in Flink. We have a
>> set of data that arrives to us on Kafka. We need to process the data in a
>> window when either one of the two criteria satisfy:
>> 1) Max number of elements has reached in the window.
>> 2) Some max time has elapsed (Say 5 milliseconds in our case).
>>
>> I have written the following code:
>>
>> public class WindowTest {
>>     public static void main (String[] args) throws Exception {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>         DataStreamSource<Long> source = env.addSource(new
>> SourceFunction<Long>() {
>>
>>             @Override
>>             public void run(SourceContext<Long> ctx) throws Exception {
>>                 LongStream.range(0, 102).forEach(ctx::collect);
>>             }
>>
>>             @Override
>>             public void cancel() {
>>
>>             }
>>         });
>>
>>
>> source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new
>> AllWindowFunction<Long, Object, TimeWindow>() {
>>             @Override
>>             public void apply(TimeWindow timeWindow, Iterable<Long>
>> values, Collector<Object> collector) throws Exception {
>>                 System.out.println("processing a window");
>>                 System.out.println(Joiner.on(',').join(values));
>>             }
>>         }).print();
>>
>>         env.execute("test-program");
>>
>>     }
>> }
>>
>> Here is the output I get when I run this code:
>>
>> processing a window
>> 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
>> processing a window
>> 15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
>> processing a window
>> 30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
>> processing a window
>> 45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
>> processing a window
>> 60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
>> processing a window
>> 75,76,77,78,79,80,81,82,83,84,85,86,87,88,89
>>
>> As you can see, the data from 90 to 101 is not processed. Shouldn't it be
>> processed when the window is completed after 5 ms?
>>
>> When I remove the trigger part, all of the data does get processed from 0
>> to 101.
>>
>> Any idea why do we see such a behaviour here?
>> --
>>
>>
>> *Regards,Harshvardhan Agrawal*
>>
>
>

-- 


*Regards,Harshvardhan Agrawal*

Re: Behaviour of triggers in Flink

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Harshvardhan,

By specifying a trigger using trigger() you are overwriting the default
trigger of a WindowAssigner. For example, if you specify a CountTrigger for
TumblingEventTimeWindows you will no longer get window firings based on the
progress of time but only by count. Right now, you have to write your own
custom trigger if you want to react based on both time and count.
More details here[1].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#default-triggers-of-windowassigners

On Sun, Jul 22, 2018 at 11:59 PM, Harshvardhan Agrawal <
harshvardhan.agr93@gmail.com> wrote:

> Hi,
>
> I have been trying to understand how triggers work in Flink. We have a set
> of data that arrives to us on Kafka. We need to process the data in a
> window when either one of the two criteria satisfy:
> 1) Max number of elements has reached in the window.
> 2) Some max time has elapsed (Say 5 milliseconds in our case).
>
> I have written the following code:
>
> public class WindowTest {
>     public static void main (String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.
> createLocalEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>         DataStreamSource<Long> source = env.addSource(new
> SourceFunction<Long>() {
>
>             @Override
>             public void run(SourceContext<Long> ctx) throws Exception {
>                 LongStream.range(0, 102).forEach(ctx::collect);
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>
>         source.timeWindowAll(Time.milliseconds(5)).trigger(
> PurgingTrigger.of(CountTrigger.of(15))).apply(new AllWindowFunction<Long,
> Object, TimeWindow>() {
>             @Override
>             public void apply(TimeWindow timeWindow, Iterable<Long>
> values, Collector<Object> collector) throws Exception {
>                 System.out.println("processing a window");
>                 System.out.println(Joiner.on(',').join(values));
>             }
>         }).print();
>
>         env.execute("test-program");
>
>     }
> }
>
> Here is the output I get when I run this code:
>
> processing a window
> 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
> processing a window
> 15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
> processing a window
> 30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
> processing a window
> 45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
> processing a window
> 60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
> processing a window
> 75,76,77,78,79,80,81,82,83,84,85,86,87,88,89
>
> As you can see, the data from 90 to 101 is not processed. Shouldn't it be
> processed when the window is completed after 5 ms?
>
> When I remove the trigger part, all of the data does get processed from 0
> to 101.
>
> Any idea why do we see such a behaviour here?
> --
>
> *Regards,Harshvardhan Agrawal*
> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>