You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jad mad <ja...@gmail.com> on 2017/07/12 12:47:13 UTC

global window trigger

for a global window with
a custom event time trigger that fires every 1 minute
and then apply a custom window function to it,

the trigger firing seems working but the element collection
i get inside of my custom WindowFunction is always
the whole inputs from start to end rather than
inputs subset from start to the every 1min window end(maxTimestamp).

is this because GlobalWindows is a processing time operator that
does not work with event time?

thanks a lot,

Re: global window trigger

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Yes, you can have state in a WindowFunction if you use Flink’s state abstraction that you can access from a RichWindowFunction using the RuntimeContext. (Or by using a ProcessWindowFunction).

Trigger purging behaviour makes a difference if the Trigger fires repeatedly before the watermark reaches the end of the window. For example a trigger that speculatively fires early. In those cases it can make sense to make a distinction between purging and firing and just firing, depending on whether you want all accumulated window contents or only those elements that have accumulated since the last trigger firing.

GlobalWindows is not implemented by setting allowed lateness very high, it is a WindowAssigner that assigns Long.MAX_VALUE to the max window timestamp, the watermark will therefore never pass the end of that GlobalWindow.

Regarding your use case: since you want to keep all data since the start I would suggest to use GlobalWindows, a custom Trigger that periodically fires and a ProcessWindwoFunction. In the ProcessWindowFunction you can make sure to only process those elements that you want to process based on their timestamp and the current event time, which you can access from a ProcessWindowFunction.

If you don’t want to keep all events indefinitely (which could eventually blow up your state size) you can use an Evictor to sometimes evict certain events from the window buffers.

Best,
Aljoscha

> On 20. Jul 2017, at 12:24, jad mad <ja...@gmail.com> wrote:
> 
> Hello Aljoscha,
> 
> > I’m afraid this will not work well because a WindowAssigner should be stateless
> ok, now understand this.
> How about inside a custom WindowFunction(...), a bad idea to have states as well?
> 
> the default trigger for EventTimeTumblingWindow is the EventTimeTrigger(...).
> looking at the definition file, there are a few return TriggerResult.FIRE; but not a single PURGE.
> even so, each time the contents get cleared the time passes a window end.
> Is this what you meant by 
> >When the watermark passes the end of a window plus the allowed lateness the window contents are being purged. 
> ?
> if yes, return TriggerResult.FIRE; or return TriggerResult.FIRE_AND_PURGE
> seems less important for a trigger implementation because the contents will be cleared any way
> and the lateness amount is more important?
> And is this how a GlobalWindows implemented by setting the "lateness" to a huge number 
> so that it keeps all things in it?
> 
> so, back to my original question.
> in order to keep everything from start like a GlobalWindows, let it fire periodically and 
> then perform some calcs, what combination of window assigner, trigger, and/or custom window function
> I may use?  better if there'd be a simple working sample.
> 
> thank you a lot!
> jad
> 
> On Thu, Jul 20, 2017 at 5:45 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> I’m afraid this will not work well because a WindowAssigner should be stateless, i.e. it should not keep any state in fields. The reason is that there can be several WindowAssigners used on the different partitions and the order in which a WindowAssigner sees the incoming elements is also not guaranteed. That is, you might set a timestamp in the “first_timestamp” field that is not chronologically the “first timestamp”.
> 
> The reason for your windows being purged is probably the allowed lateness, which is zero by default. When the watermark passes the end of a window plus the allowed lateness the window contents are being purged. You can configure the allowed lateness via WindowedStream.allowedLateness(). You should be careful, though, because of you set this too high you might never clean up your window state and therefore have ever growing state.
> 
> Best,
> Aljoscha
> 
>> On 18. Jul 2017, at 15:05, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Aljoscha,
>> 
>> what a great answer and this is what I'd expected!
>> 
>> as a workaround I've modified the EventTimeSlidingWindow a little bit to a custom WindowAssigner like below : 
>> the a few differences are 
>> 1.storing the first timestamp in a variable "first_timestamp", 
>> 2.used this time stamp as the any following windows' start time.
>> @PublicEvolving
>> public class MySlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
>>     private static final long serialVersionUID = 1L;
>>     private final long size;
>>     private final long slide;
>>     private final long offset;
>>     private long first_timestamp = -1L; // added by me!
>> 
>>     protected MySlidingEventTimeWindows(long size, long slide, long offset) {
>>         if(offset >= 0L && offset < slide && size > 0L) {
>>             this.size = size;
>>             this.slide = slide;
>>             this.offset = offset;
>>         } else {
>>             throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
>>         }
>>     }
>> 
>>     public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
>>         if(timestamp <= -9223372036854775808L) {
>>             throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
>>         } else {
>>             if(this.first_timestamp == -1L) {// added by me!
>>                 this.first_timestamp = timestamp;
>>                 System.out.println("===================== " + this.first_timestamp + " ========================");
>>             }
>>             List<TimeWindow> windows = new ArrayList((int)(this.size / this.slide));
>>             long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide);
>> 
>>             for(long start = lastStart; start > timestamp - this.size; start -= this.slide) {
>>                 //windows.add(new TimeWindow(start, start + this.size)); // original implementation
>>                 windows.add(new TimeWindow(this.first_timestamp, start + this.size)); // modified by me!
>>             }
>>             return windows;
>>         }
>>     }
>> the result I get from MyWindowFunction(...) is like below : 
>> 2017-01-01 00:17:39	2017-01-01 00:00:01	2
>> 2017-01-01 00:17:39	2017-01-01 00:00:02	4
>> 2017-01-01 00:17:39	2017-01-01 00:00:03	4
>> 2017-01-01 00:17:39	2017-01-01 00:00:04	10
>> 2017-01-01 00:17:39	2017-01-01 00:00:05	19
>> 2017-01-01 00:17:39	2017-01-01 00:00:06	19
>> 2017-01-01 00:17:39	2017-01-01 00:00:07	20
>> 2017-01-01 00:17:39	2017-01-01 00:00:08	23
>> 2017-01-01 00:17:39	2017-01-01 00:00:09	21
>> 2017-01-01 00:17:39	2017-01-01 00:00:10	7
>> 2017-01-01 00:17:39	2017-01-01 00:00:11	2
>> 2017-01-01 00:17:39	2017-01-01 00:00:12	5
>> 2017-01-01 00:17:39	2017-01-01 00:00:13	12
>> 2017-01-01 00:17:39	2017-01-01 00:00:14	17
>> 2017-01-01 00:17:39	2017-01-01 00:00:15	9
>> 2017-01-01 00:17:39	2017-01-01 00:00:16	8
>> 
>> things I don't seem to understand are 
>> 1. when my inputs' first line time stamp is 2017-01-01 00:00:00 why is 2017-01-01 00:17:39 shown up in my result as 
>>      each sliding window's start time?
>>     basically, I'm just printing out the time stamp came with the first iterable object's element in MyWindowFunction.
>> 2. I made MyWindowAssigner in a hope that the starting time is fixed and the contents not being purged.
>>     however, from the results, we can see it works just as a normal EventTimeSlidingWindow with contents
>>     been purged.
>>     How can I make it not to throw away its window contents even after each time firing.
>> 3. this MyWindowAssigner(...) attempt arose as an effort based on your previous advice using a 
>>     different WindowFunction. wonder if I'm heading to the right direction or not.
>>     
>> thank you very much!
>> jad
>> 
>> On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark T says that there will not be elements with a timestamp <= T in the future. It does not say, that there have not yet been elements with a timestamp > T. In your specific case, this means that there will be elements in the GlobalWindow that have a timestamp that is after the firing timestamp of your trigger. If you want to make sure that windows are somehow put into buckets, based on their timestamp then you need to use a different WindowFunction, because GlobalWindows simply puts every element into the same bucket (window).
>> 
>> Regarding the firing timestamp, it’s currently not possible the get that from within a WindowFunction.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 16. Jul 2017, at 12:16, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hello Aljoscha,
>>> 
>>> thank you very much for your reply. the issue with me is two-fold.
>>> first of all, 
>>> the thing I wanted to achieve was having a GlobalWindows and let it fire 
>>> periodically, say 1 hour or 1 day, and then do some custom calculation.
>>> this custom trigger part I've implemented seems working well.
>>> 
>>> currently, when every time my custom trigger fires periodically, the elements of iterable object
>>> passed onto my custom WindowFunction contains whole inputs from the start to the end rather than
>>> from start to the timing(event time timestamp) where each time trigger fires.
>>> have been worked on this for a week now but not being able to find any solution yet.
>>> 
>>> input example. 
>>> 2017-07-16 00:00:01, x
>>> 2017-07-16 00:00:12, x
>>> 2017-07-16 01:03:06, x
>>> 2017-07-16 02:20:10, x
>>> 
>>> In this case, a GlobalWindows with 1-hour periodical trigger, designed to count the cumulative record in MyWindowFunction should emit something like
>>> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
>>> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
>>> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
>>> ↑ the start time stamp doesn't change!
>>> 
>>> now, what I get is like
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> ↑every line the same results...
>>> 
>>> public class MyWindowFunction<T, W extends Window>  implements WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple, W> {
>>> 
>>>     @Override
>>>     public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>> iterable, Collector<Tuple3<String, String, String>> out) throws Exception {
>>> 
>>>         for(Tuple2<String, String> element : iterable)
>>>         {
>>>             ...
>>>         }
>>>         out.collect(new Tuple3<String, String,  String>("...", "...", "..."));
>>>     }
>>> }
>>> Secondly, for a GlobalWindows firing periodically, how do you get the periodical firing time stamp inside of
>>> your MyWindowFunction? (the missing ~ part of ending time stamp in above example)
>>> 
>>> really appreciate the help!
>>> jad
>>> 
>>> 
>>> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>> Hi,
>>> 
>>> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 14. Jul 2017, at 16:29, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi Aljoscha
>>>> 
>>>> thanks for the comment. 
>>>> is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
>>>> inside of a custom trigger?
>>>> 
>>>> gave it a test and the result seems the opposite of what I meant...
>>>> instead of throwing away previous windows' contents, I wanna keep them
>>>> all the way till the end. 
>>>> that way I can get the cumulative counts of all input.
>>>> 
>>>> wonder how to achieve it.
>>>> anyone?
>>>> 
>>>> jad
>>>> 
>>>> 
>>>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>>> Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
>>>> 
>>>> You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.
>>>> 
>>>> .window(PurgingTrigger.of(<my trigger>))
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Prashant,
>>>>> 
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> 
>>>>> actually I could make my custom trigger to fire periodically.
>>>>> The problem is the element set stored in the iterable variable 
>>>>> is always uniform which is not what I'm expecting...
>>>>> 
>>>>> private static class MyWindowFunction_Window...
>>>>>          ...    
>>>>>        @Override
>>>>>         public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
>>>>>              ...
>>>>>              for(MyClass element : iterable)
>>>>> 
>>>>> does anyone have any idea on this?
>>>>> thanks a lot in advance,
>>>>> jad
>>>>> 
>>>>> 
>>>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <prashant@intellifylearning.com <ma...@intellifylearning.com>> wrote:
>>>>> Hi
>>>>> 
>>>>> We've have custom operators using global windows and are using event time.
>>>>> 
>>>>> How are you specifying event time as the time characteristic?
>>>>> 
>>>>> Prashant
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com <http://nabble.com/>.
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
>> 
>> On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark T says that there will not be elements with a timestamp <= T in the future. It does not say, that there have not yet been elements with a timestamp > T. In your specific case, this means that there will be elements in the GlobalWindow that have a timestamp that is after the firing timestamp of your trigger. If you want to make sure that windows are somehow put into buckets, based on their timestamp then you need to use a different WindowFunction, because GlobalWindows simply puts every element into the same bucket (window).
>> 
>> Regarding the firing timestamp, it’s currently not possible the get that from within a WindowFunction.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 16. Jul 2017, at 12:16, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hello Aljoscha,
>>> 
>>> thank you very much for your reply. the issue with me is two-fold.
>>> first of all, 
>>> the thing I wanted to achieve was having a GlobalWindows and let it fire 
>>> periodically, say 1 hour or 1 day, and then do some custom calculation.
>>> this custom trigger part I've implemented seems working well.
>>> 
>>> currently, when every time my custom trigger fires periodically, the elements of iterable object
>>> passed onto my custom WindowFunction contains whole inputs from the start to the end rather than
>>> from start to the timing(event time timestamp) where each time trigger fires.
>>> have been worked on this for a week now but not being able to find any solution yet.
>>> 
>>> input example. 
>>> 2017-07-16 00:00:01, x
>>> 2017-07-16 00:00:12, x
>>> 2017-07-16 01:03:06, x
>>> 2017-07-16 02:20:10, x
>>> 
>>> In this case, a GlobalWindows with 1-hour periodical trigger, designed to count the cumulative record in MyWindowFunction should emit something like
>>> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
>>> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
>>> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
>>> ↑ the start time stamp doesn't change!
>>> 
>>> now, what I get is like
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> 2017-07-16 00:00:00 ~ , 4
>>> ↑every line the same results...
>>> 
>>> public class MyWindowFunction<T, W extends Window>  implements WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple, W> {
>>> 
>>>     @Override
>>>     public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>> iterable, Collector<Tuple3<String, String, String>> out) throws Exception {
>>> 
>>>         for(Tuple2<String, String> element : iterable)
>>>         {
>>>             ...
>>>         }
>>>         out.collect(new Tuple3<String, String,  String>("...", "...", "..."));
>>>     }
>>> }
>>> Secondly, for a GlobalWindows firing periodically, how do you get the periodical firing time stamp inside of
>>> your MyWindowFunction? (the missing ~ part of ending time stamp in above example)
>>> 
>>> really appreciate the help!
>>> jad
>>> 
>>> 
>>> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>> Hi,
>>> 
>>> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 14. Jul 2017, at 16:29, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi Aljoscha
>>>> 
>>>> thanks for the comment. 
>>>> is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
>>>> inside of a custom trigger?
>>>> 
>>>> gave it a test and the result seems the opposite of what I meant...
>>>> instead of throwing away previous windows' contents, I wanna keep them
>>>> all the way till the end. 
>>>> that way I can get the cumulative counts of all input.
>>>> 
>>>> wonder how to achieve it.
>>>> anyone?
>>>> 
>>>> jad
>>>> 
>>>> 
>>>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>>> Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
>>>> 
>>>> You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.
>>>> 
>>>> .window(PurgingTrigger.of(<my trigger>))
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Prashant,
>>>>> 
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> 
>>>>> actually I could make my custom trigger to fire periodically.
>>>>> The problem is the element set stored in the iterable variable 
>>>>> is always uniform which is not what I'm expecting...
>>>>> 
>>>>> private static class MyWindowFunction_Window...
>>>>>          ...    
>>>>>        @Override
>>>>>         public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
>>>>>              ...
>>>>>              for(MyClass element : iterable)
>>>>> 
>>>>> does anyone have any idea on this?
>>>>> thanks a lot in advance,
>>>>> jad
>>>>> 
>>>>> 
>>>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <prashant@intellifylearning.com <ma...@intellifylearning.com>> wrote:
>>>>> Hi
>>>>> 
>>>>> We've have custom operators using global windows and are using event time.
>>>>> 
>>>>> How are you specifying event time as the time characteristic?
>>>>> 
>>>>> Prashant
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com <http://nabble.com/>.
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Re: global window trigger

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I’m afraid this will not work well because a WindowAssigner should be stateless, i.e. it should not keep any state in fields. The reason is that there can be several WindowAssigners used on the different partitions and the order in which a WindowAssigner sees the incoming elements is also not guaranteed. That is, you might set a timestamp in the “first_timestamp” field that is not chronologically the “first timestamp”.

The reason for your windows being purged is probably the allowed lateness, which is zero by default. When the watermark passes the end of a window plus the allowed lateness the window contents are being purged. You can configure the allowed lateness via WindowedStream.allowedLateness(). You should be careful, though, because of you set this too high you might never clean up your window state and therefore have ever growing state.

Best,
Aljoscha

> On 18. Jul 2017, at 15:05, jad mad <ja...@gmail.com> wrote:
> 
> Aljoscha,
> 
> what a great answer and this is what I'd expected!
> 
> as a workaround I've modified the EventTimeSlidingWindow a little bit to a custom WindowAssigner like below : 
> the a few differences are 
> 1.storing the first timestamp in a variable "first_timestamp", 
> 2.used this time stamp as the any following windows' start time.
> @PublicEvolving
> public class MySlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
>     private static final long serialVersionUID = 1L;
>     private final long size;
>     private final long slide;
>     private final long offset;
>     private long first_timestamp = -1L; // added by me!
> 
>     protected MySlidingEventTimeWindows(long size, long slide, long offset) {
>         if(offset >= 0L && offset < slide && size > 0L) {
>             this.size = size;
>             this.slide = slide;
>             this.offset = offset;
>         } else {
>             throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
>         }
>     }
> 
>     public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
>         if(timestamp <= -9223372036854775808L) {
>             throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
>         } else {
>             if(this.first_timestamp == -1L) {// added by me!
>                 this.first_timestamp = timestamp;
>                 System.out.println("===================== " + this.first_timestamp + " ========================");
>             }
>             List<TimeWindow> windows = new ArrayList((int)(this.size / this.slide));
>             long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide);
> 
>             for(long start = lastStart; start > timestamp - this.size; start -= this.slide) {
>                 //windows.add(new TimeWindow(start, start + this.size)); // original implementation
>                 windows.add(new TimeWindow(this.first_timestamp, start + this.size)); // modified by me!
>             }
>             return windows;
>         }
>     }
> the result I get from MyWindowFunction(...) is like below : 
> 2017-01-01 00:17:39	2017-01-01 00:00:01	2
> 2017-01-01 00:17:39	2017-01-01 00:00:02	4
> 2017-01-01 00:17:39	2017-01-01 00:00:03	4
> 2017-01-01 00:17:39	2017-01-01 00:00:04	10
> 2017-01-01 00:17:39	2017-01-01 00:00:05	19
> 2017-01-01 00:17:39	2017-01-01 00:00:06	19
> 2017-01-01 00:17:39	2017-01-01 00:00:07	20
> 2017-01-01 00:17:39	2017-01-01 00:00:08	23
> 2017-01-01 00:17:39	2017-01-01 00:00:09	21
> 2017-01-01 00:17:39	2017-01-01 00:00:10	7
> 2017-01-01 00:17:39	2017-01-01 00:00:11	2
> 2017-01-01 00:17:39	2017-01-01 00:00:12	5
> 2017-01-01 00:17:39	2017-01-01 00:00:13	12
> 2017-01-01 00:17:39	2017-01-01 00:00:14	17
> 2017-01-01 00:17:39	2017-01-01 00:00:15	9
> 2017-01-01 00:17:39	2017-01-01 00:00:16	8
> 
> things I don't seem to understand are 
> 1. when my inputs' first line time stamp is 2017-01-01 00:00:00 why is 2017-01-01 00:17:39 shown up in my result as 
>      each sliding window's start time?
>     basically, I'm just printing out the time stamp came with the first iterable object's element in MyWindowFunction.
> 2. I made MyWindowAssigner in a hope that the starting time is fixed and the contents not being purged.
>     however, from the results, we can see it works just as a normal EventTimeSlidingWindow with contents
>     been purged.
>     How can I make it not to throw away its window contents even after each time firing.
> 3. this MyWindowAssigner(...) attempt arose as an effort based on your previous advice using a 
>     different WindowFunction. wonder if I'm heading to the right direction or not.
>     
> thank you very much!
> jad
> 
> On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark T says that there will not be elements with a timestamp <= T in the future. It does not say, that there have not yet been elements with a timestamp > T. In your specific case, this means that there will be elements in the GlobalWindow that have a timestamp that is after the firing timestamp of your trigger. If you want to make sure that windows are somehow put into buckets, based on their timestamp then you need to use a different WindowFunction, because GlobalWindows simply puts every element into the same bucket (window).
> 
> Regarding the firing timestamp, it’s currently not possible the get that from within a WindowFunction.
> 
> Best,
> Aljoscha
> 
> 
>> On 16. Jul 2017, at 12:16, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hello Aljoscha,
>> 
>> thank you very much for your reply. the issue with me is two-fold.
>> first of all, 
>> the thing I wanted to achieve was having a GlobalWindows and let it fire 
>> periodically, say 1 hour or 1 day, and then do some custom calculation.
>> this custom trigger part I've implemented seems working well.
>> 
>> currently, when every time my custom trigger fires periodically, the elements of iterable object
>> passed onto my custom WindowFunction contains whole inputs from the start to the end rather than
>> from start to the timing(event time timestamp) where each time trigger fires.
>> have been worked on this for a week now but not being able to find any solution yet.
>> 
>> input example. 
>> 2017-07-16 00:00:01, x
>> 2017-07-16 00:00:12, x
>> 2017-07-16 01:03:06, x
>> 2017-07-16 02:20:10, x
>> 
>> In this case, a GlobalWindows with 1-hour periodical trigger, designed to count the cumulative record in MyWindowFunction should emit something like
>> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
>> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
>> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
>> ↑ the start time stamp doesn't change!
>> 
>> now, what I get is like
>> 2017-07-16 00:00:00 ~ , 4
>> 2017-07-16 00:00:00 ~ , 4
>> 2017-07-16 00:00:00 ~ , 4
>> ↑every line the same results...
>> 
>> public class MyWindowFunction<T, W extends Window>  implements WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple, W> {
>> 
>>     @Override
>>     public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>> iterable, Collector<Tuple3<String, String, String>> out) throws Exception {
>> 
>>         for(Tuple2<String, String> element : iterable)
>>         {
>>             ...
>>         }
>>         out.collect(new Tuple3<String, String,  String>("...", "...", "..."));
>>     }
>> }
>> Secondly, for a GlobalWindows firing periodically, how do you get the periodical firing time stamp inside of
>> your MyWindowFunction? (the missing ~ part of ending time stamp in above example)
>> 
>> really appreciate the help!
>> jad
>> 
>> 
>> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> Hi,
>> 
>> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 14. Jul 2017, at 16:29, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Aljoscha
>>> 
>>> thanks for the comment. 
>>> is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
>>> inside of a custom trigger?
>>> 
>>> gave it a test and the result seems the opposite of what I meant...
>>> instead of throwing away previous windows' contents, I wanna keep them
>>> all the way till the end. 
>>> that way I can get the cumulative counts of all input.
>>> 
>>> wonder how to achieve it.
>>> anyone?
>>> 
>>> jad
>>> 
>>> 
>>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>> Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
>>> 
>>> You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.
>>> 
>>> .window(PurgingTrigger.of(<my trigger>))
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi Prashant,
>>>> 
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> 
>>>> actually I could make my custom trigger to fire periodically.
>>>> The problem is the element set stored in the iterable variable 
>>>> is always uniform which is not what I'm expecting...
>>>> 
>>>> private static class MyWindowFunction_Window...
>>>>          ...    
>>>>        @Override
>>>>         public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
>>>>              ...
>>>>              for(MyClass element : iterable)
>>>> 
>>>> does anyone have any idea on this?
>>>> thanks a lot in advance,
>>>> jad
>>>> 
>>>> 
>>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <prashant@intellifylearning.com <ma...@intellifylearning.com>> wrote:
>>>> Hi
>>>> 
>>>> We've have custom operators using global windows and are using event time.
>>>> 
>>>> How are you specifying event time as the time characteristic?
>>>> 
>>>> Prashant
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com <http://nabble.com/>.
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> 
> On Mon, Jul 17, 2017 at 7:22 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Ah, I see. The problem is that the watermark has slightly tricky semantics: A watermark T says that there will not be elements with a timestamp <= T in the future. It does not say, that there have not yet been elements with a timestamp > T. In your specific case, this means that there will be elements in the GlobalWindow that have a timestamp that is after the firing timestamp of your trigger. If you want to make sure that windows are somehow put into buckets, based on their timestamp then you need to use a different WindowFunction, because GlobalWindows simply puts every element into the same bucket (window).
> 
> Regarding the firing timestamp, it’s currently not possible the get that from within a WindowFunction.
> 
> Best,
> Aljoscha
> 
> 
>> On 16. Jul 2017, at 12:16, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hello Aljoscha,
>> 
>> thank you very much for your reply. the issue with me is two-fold.
>> first of all, 
>> the thing I wanted to achieve was having a GlobalWindows and let it fire 
>> periodically, say 1 hour or 1 day, and then do some custom calculation.
>> this custom trigger part I've implemented seems working well.
>> 
>> currently, when every time my custom trigger fires periodically, the elements of iterable object
>> passed onto my custom WindowFunction contains whole inputs from the start to the end rather than
>> from start to the timing(event time timestamp) where each time trigger fires.
>> have been worked on this for a week now but not being able to find any solution yet.
>> 
>> input example. 
>> 2017-07-16 00:00:01, x
>> 2017-07-16 00:00:12, x
>> 2017-07-16 01:03:06, x
>> 2017-07-16 02:20:10, x
>> 
>> In this case, a GlobalWindows with 1-hour periodical trigger, designed to count the cumulative record in MyWindowFunction should emit something like
>> 2017-07-16 00:00:00 ~ 2017-07-16 01:00:00, 2
>> 2017-07-16 00:00:00 ~ 2017-07-16 02:00:00, 3
>> 2017-07-16 00:00:00 ~ 2017-07-16 03:00:00, 4
>> ↑ the start time stamp doesn't change!
>> 
>> now, what I get is like
>> 2017-07-16 00:00:00 ~ , 4
>> 2017-07-16 00:00:00 ~ , 4
>> 2017-07-16 00:00:00 ~ , 4
>> ↑every line the same results...
>> 
>> public class MyWindowFunction<T, W extends Window>  implements WindowFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple, W> {
>> 
>>     @Override
>>     public void apply(Tuple tuple, W window, Iterable<Tuple2<String, String>> iterable, Collector<Tuple3<String, String, String>> out) throws Exception {
>> 
>>         for(Tuple2<String, String> element : iterable)
>>         {
>>             ...
>>         }
>>         out.collect(new Tuple3<String, String,  String>("...", "...", "..."));
>>     }
>> }
>> Secondly, for a GlobalWindows firing periodically, how do you get the periodical firing time stamp inside of
>> your MyWindowFunction? (the missing ~ part of ending time stamp in above example)
>> 
>> really appreciate the help!
>> jad
>> 
>> 
>> On Sun, Jul 16, 2017 at 6:15 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> Hi,
>> 
>> Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 14. Jul 2017, at 16:29, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Aljoscha
>>> 
>>> thanks for the comment. 
>>> is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
>>> inside of a custom trigger?
>>> 
>>> gave it a test and the result seems the opposite of what I meant...
>>> instead of throwing away previous windows' contents, I wanna keep them
>>> all the way till the end. 
>>> that way I can get the cumulative counts of all input.
>>> 
>>> wonder how to achieve it.
>>> anyone?
>>> 
>>> jad
>>> 
>>> 
>>> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>> Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
>>> 
>>> You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.
>>> 
>>> .window(PurgingTrigger.of(<my trigger>))
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 13. Jul 2017, at 14:00, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi Prashant,
>>>> 
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> 
>>>> actually I could make my custom trigger to fire periodically.
>>>> The problem is the element set stored in the iterable variable 
>>>> is always uniform which is not what I'm expecting...
>>>> 
>>>> private static class MyWindowFunction_Window...
>>>>          ...    
>>>>        @Override
>>>>         public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
>>>>              ...
>>>>              for(MyClass element : iterable)
>>>> 
>>>> does anyone have any idea on this?
>>>> thanks a lot in advance,
>>>> jad
>>>> 
>>>> 
>>>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <prashant@intellifylearning.com <ma...@intellifylearning.com>> wrote:
>>>> Hi
>>>> 
>>>> We've have custom operators using global windows and are using event time.
>>>> 
>>>> How are you specifying event time as the time characteristic?
>>>> 
>>>> Prashant
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com <http://nabble.com/>.
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Re: global window trigger

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Ok, then I misunderstood. Yes, a PurgingTrigger it similar (the same) to always returning FIRE_AND_PURGE instead of FIRE in a custom Trigger. I thought your problem was that data is never cleared away when using GlobalWindows. Is that not the case?

Best,
Aljoscha
> On 14. Jul 2017, at 16:29, jad mad <ja...@gmail.com> wrote:
> 
> Hi Aljoscha
> 
> thanks for the comment. 
> is wrapping by a PurgingTrigger.of() the same as doing "return TriggerResult.FIRE_AND_PURGE;" 
> inside of a custom trigger?
> 
> gave it a test and the result seems the opposite of what I meant...
> instead of throwing away previous windows' contents, I wanna keep them
> all the way till the end. 
> that way I can get the cumulative counts of all input.
> 
> wonder how to achieve it.
> anyone?
> 
> jad
> 
> 
> On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
> 
> You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.
> 
> .window(PurgingTrigger.of(<my trigger>))
> 
> Best,
> Aljoscha
> 
>> On 13. Jul 2017, at 14:00, jad mad <jadmad0828@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Prashant,
>> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>> actually I could make my custom trigger to fire periodically.
>> The problem is the element set stored in the iterable variable 
>> is always uniform which is not what I'm expecting...
>> 
>> private static class MyWindowFunction_Window...
>>          ...    
>>        @Override
>>         public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
>>              ...
>>              for(MyClass element : iterable)
>> 
>> does anyone have any idea on this?
>> thanks a lot in advance,
>> jad
>> 
>> 
>> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <prashant@intellifylearning.com <ma...@intellifylearning.com>> wrote:
>> Hi
>> 
>> We've have custom operators using global windows and are using event time.
>> 
>> How are you specifying event time as the time characteristic?
>> 
>> Prashant
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com <http://nabble.com/>.
>> 
> 
> 


Re: global window trigger

Posted by jad mad <ja...@gmail.com>.
Hi Aljoscha

thanks for the comment.
is wrapping by a PurgingTrigger.of() the same as doing "return
TriggerResult.FIRE_AND_PURGE;"
inside of a custom trigger?

gave it a test and the result seems the opposite of what I meant...
instead of throwing away previous windows' contents, I wanna keep them
all the way till the end.
that way I can get the cumulative counts of all input.

wonder how to achieve it.
anyone?

jad


On Fri, Jul 14, 2017 at 12:16 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Window contents are only purged from state if the Trigger says so or if
> the watermark passes the garbage collection horizon for a given window.
> With GlobalWindows, the GC horizon is never reached, that leaves Triggers.
>
> You can create a Trigger that purges every time it fires by wrapping it in
> a PurgingTrigger, i.e.
>
> .window(PurgingTrigger.of(<my trigger>))
>
> Best,
> Aljoscha
>
> On 13. Jul 2017, at 14:00, jad mad <ja...@gmail.com> wrote:
>
> Hi Prashant,
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> actually I could make my custom trigger to fire periodically.
> The problem is the element set stored in the iterable variable
> is always uniform which is not what I'm expecting...
>
> private static class MyWindowFunction_Window...
>          ...
>        @Override
>         public void apply(Tuple tuple, W window, Iterable<MyClass>
> iterable,
>              ...
>              for(MyClass element : iterable)
>
> does anyone have any idea on this?
> thanks a lot in advance,
> jad
>
>
> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <
> prashant@intellifylearning.com> wrote:
>
>> Hi
>>
>> We've have custom operators using global windows and are using event time.
>>
>> How are you specifying event time as the time characteristic?
>>
>> Prashant
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/global-window-trigg
>> er-tp14206p14239.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>

Re: global window trigger

Posted by Aljoscha Krettek <al...@apache.org>.
Window contents are only purged from state if the Trigger says so or if the watermark passes the garbage collection horizon for a given window. With GlobalWindows, the GC horizon is never reached, that leaves Triggers.

You can create a Trigger that purges every time it fires by wrapping it in a PurgingTrigger, i.e.

.window(PurgingTrigger.of(<my trigger>))

Best,
Aljoscha

> On 13. Jul 2017, at 14:00, jad mad <ja...@gmail.com> wrote:
> 
> Hi Prashant,
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
> actually I could make my custom trigger to fire periodically.
> The problem is the element set stored in the iterable variable 
> is always uniform which is not what I'm expecting...
> 
> private static class MyWindowFunction_Window...
>          ...    
>        @Override
>         public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
>              ...
>              for(MyClass element : iterable)
> 
> does anyone have any idea on this?
> thanks a lot in advance,
> jad
> 
> 
> On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <prashant@intellifylearning.com <ma...@intellifylearning.com>> wrote:
> Hi
> 
> We've have custom operators using global windows and are using event time.
> 
> How are you specifying event time as the time characteristic?
> 
> Prashant
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> 


Re: global window trigger

Posted by jad mad <ja...@gmail.com>.
Hi Prashant,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

actually I could make my custom trigger to fire periodically.
The problem is the element set stored in the iterable variable
is always uniform which is not what I'm expecting...

private static class MyWindowFunction_Window...
         ...
       @Override
        public void apply(Tuple tuple, W window, Iterable<MyClass> iterable,
             ...
             for(MyClass element : iterable)

does anyone have any idea on this?
thanks a lot in advance,
jad


On Thu, Jul 13, 2017 at 10:55 AM, prashantnayak <
prashant@intellifylearning.com> wrote:

> Hi
>
> We've have custom operators using global windows and are using event time.
>
> How are you specifying event time as the time characteristic?
>
> Prashant
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/global-window-
> trigger-tp14206p14239.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: global window trigger

Posted by prashantnayak <pr...@intellifylearning.com>.
Hi 

We've have custom operators using global windows and are using event time.

How are you specifying event time as the time characteristic?

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/global-window-trigger-tp14206p14239.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.