You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by chen <er...@126.com> on 2017/12/16 16:07:06 UTC

flink eventTime, lateness, maxoutoforderness

eventTime, lateness,  maxoutoforderness are all about time.
event Time is the water mark time on the record.
lateness is record time or the real word time?
maxoutoforderness is record time or the real word time?

dataStream.keyBy(row -> (String)row.getField(0))
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
         .allowedLateness(Time.seconds(5))
         .fold(initRow(), new MyFoldFunction())

public Watermark getCurrentWatermark() {
        return new Watermark(currentTime - 5000);}

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: flink eventTime, lateness, maxoutoforderness

Posted by chen <er...@126.com>.
CODE with maxOutOfOrdernesstime effect:
        dataStream.keyBy(row -> (String)row.getField(0))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold(initRow(), new FoldFunction<Row, Row>() {
                    @Override
                    public Row fold(Row ret, Row o) throws Exception {
                        ret.setField(0, (int)ret.getField(0) + 1);
                        ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
                        return  ret;
                    }
                });
         public Watermark getCurrentWatermark(){
                  return new Watermark(currentTime - 5000);}

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :  
        1,1483250636000|
        4,1483250640000|1483250642000|1483250641000|1483250643000|
        4,1483250649000|1483250648000|1483250645000|1483250647000|
        2,1483250650000|1483250653000|
        1,1483250658000|
        3,1483250661000|1483250662000|1483250663000|
        1,1483250667000|

2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of maxOutOfOrdernesstime, it will delay calculate, then coming out
result.
        1,1483250636000|
        2,1483250640000|1483250642000|
        3,1483250649000|1483250648000|1483250645000|
        2,1483250650000|1483250653000|
        1,1483250658000|
        3,1483250661000|1483250662000|1483250663000|
        1,1483250667000|

I don`t know how to explain the eventTime, lateness, maxOutOfOrderness.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: flink eventTime, lateness, maxoutoforderness

Posted by chen <er...@126.com>.
Hi Eron,
    Thanks for your help. Actually I know maxoutoforder, lateness is based
on Event Time. But in my test it is not. Following is my code and test data.
         "key1|1483250640000|",
         "key1|1483250636000|",
         "key1|1483250649000|",
         "key1|1483250642000|",
         "key1|1483250650000|",
         "key1|1483250641000|",
         "key1|1483250653000|",
         "key1|1483250648000|",
         "key1|1483250645000|",
         "key1|1483250658000|",
         "key1|1483250647000|",
         "key1|1483250643000|",
         "key1|1483250661000|",
         "key1|1483250662000|",
         "key1|1483250667000|",
         "key1|1483250663000|",

         dataStream.keyBy(row -> (String)row.getField(0))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))
                .fold(initRow(), new FoldFunction<Row, Row>() {
                    @Override
                    public Row fold(Row ret, Row o) throws Exception {
                        ret.setField(0, (int)ret.getField(0) + 1);
                        ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
                        return  ret;
                    }
                })

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :
         1,1483250636000|
         4,1483250640000|1483250642000|1483250641000|1483250643000|
         4,1483250649000|1483250648000|1483250645000|1483250647000|
         2,1483250650000|1483250653000|
         1,1483250658000|
         3,1483250661000|1483250662000|1483250663000|
         1,1483250667000|
2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of allowedLateness, it will trigger the window to calculate again,
the result will come out again.
          1,1483250636000|
          1,1483250640000|
          2,1483250640000|1483250642000|
          1,1483250649000|
          2,1483250649000|1483250648000|
          3,1483250649000|1483250648000|1483250645000|
          2,1483250650000|1483250653000|
          1,1483250658000|
          2,1483250661000|1483250662000|
          3,1483250661000|1483250662000|1483250663000|
          1,1483250667000|




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: flink eventTime, lateness, maxoutoforderness

Posted by Eron Wright <er...@gmail.com>.
Take a look at the section of Flink documentation titled "Event Time and
Watermarks":
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html#event-time-and-watermarks

Also read the excellent series "Streaming 101" and "102", has useful
animations depicting the flow of time:
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

Think of the watermark as a clock, ticking along due to information from
the connector or from a watermark generator.

Hope this helps,
Eron


On Sat, Dec 16, 2017 at 8:07 AM, chen <er...@126.com> wrote:

> eventTime, lateness,  maxoutoforderness are all about time.
> event Time is the water mark time on the record.
> lateness is record time or the real word time?
> maxoutoforderness is record time or the real word time?
>
> dataStream.keyBy(row -> (String)row.getField(0))
>         .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>          .allowedLateness(Time.seconds(5))
>          .fold(initRow(), new MyFoldFunction())
>
> public Watermark getCurrentWatermark() {
>         return new Watermark(currentTime - 5000);}
>
> Does anyone could explain the time of eventTime,lateness,
> maxoutoforderness?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: flink eventTime, lateness, maxoutoforderness

Posted by chen <er...@126.com>.
Thanks Gordon, Please see the rely. I use code, but the result it doesn`t
like what the doc explain.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: flink eventTime, lateness, maxoutoforderness

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

lateness is record time or the real word time? 
maxoutoforderness is record time or the real word time? 

Both allow lateness of window operators, or maxOutOfOrderness of the BoundedOutOfOrdernessTimestampExtractor, refer to event time.

i.e.,
- given the end timestamp of a window is x (in event time) and allowed lateness is y, the window state is cleared only when the current watermark of the window operator passes x+y
- the BoundedOutOfOrdernessTimestampExtractor emits watermarks that lag behind the max record timestamp (in event time) by a fixed amount of time (again, in event time).

Hope this clarifies things for you!

Cheers,
Gordon
On 16 December 2017 at 8:07:15 AM, chen (eric__715@126.com) wrote:

eventTime, lateness, maxoutoforderness are all about time.  
event Time is the water mark time on the record.  
lateness is record time or the real word time?  
maxoutoforderness is record time or the real word time?  

dataStream.keyBy(row -> (String)row.getField(0))  
.window(TumblingEventTimeWindows.of(Time.seconds(5)))  
.allowedLateness(Time.seconds(5))  
.fold(initRow(), new MyFoldFunction())  

public Watermark getCurrentWatermark() {  
return new Watermark(currentTime - 5000);}  

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/