You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2018/03/16 17:08:54 UTC

How to correct use timeWindow() with DataStream?

Hi all,

I am building an example with DataStream using Flink that has a fake source
generator of LogLine(Date d, String line). I want to work with Watermarks
on it so I created a class that implements AssignerWithPeriodicWatermarks.
If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
I can group by second and concatenate the lines. When I use
".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
misunderstood something when I was reading about Event Time. Could anyone
help me please? My source code is as follow.

Thanks for the ideas. Kind Regards,  Felipe

package flink.example.streaming;

import flink.util.LogLine;
import flink.util.LogSourceFunction;
import flink.util.UtilDate;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;
import java.util.Date;

public class EventTimeStreamExampleJava {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<LogLine> dataStream = env
                .addSource(new LogSourceFunction())
                .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessGenerator())
                .keyBy(lineLog -> lineLog.getSec())
                // .timeWindow(Time.seconds(2))
                .reduce((log1, log2) -> new LogLine(log1.getTime(),
log1.getLine() + " | " + log2.getLine()))
                ;

        dataStream.print();

        env.execute("Window LogRead");
    }

    public static class BoundedOutOfOrdernessGenerator implements
AssignerWithPeriodicWatermarks<LogLine> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(LogLine element, long
previousElementTimestamp) {
            long timestamp = element.getTime().getSeconds();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the
out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}

package flink.util;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class LogSourceFunction implements SourceFunction<LogLine> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<LogLine> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(new LogLine(UtilDate.getRandomSec(),
UtilDate.getRandomString()));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package flink.util;

import java.util.Date;
import java.util.Objects;

public class LogLine {

    private Date time;
    private int sec;
    private String line;

    public LogLine() {
    }

    public LogLine(Date time, String line) {
        this.sec = time.getSeconds();
        this.time = time;
        this.line = line;
    }

    public LogLine(int sec, String line) {
        this.sec = sec;
        this.time = UtilDate.getRandomDate(sec);
        this.line = line;
    }

    public int getSec() {
        return sec;
    }

    public void setSec(int sec) {
        this.sec = sec;
    }

    public Date getTime() {
        return time;
    }

    public String getLine() {
        return line;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        LogLine logLine = (LogLine) o;
        return Objects.equals(time, logLine.time) &&
                Objects.equals(sec, logLine.sec) &&
                Objects.equals(line, logLine.line);
    }

    @Override
    public int hashCode() {

        return Objects.hash(time, sec, line);
    }

    @Override
    public String toString() {
        return "LogLine{" +
                "time=" + time +
                ", sec=" + sec +
                ", line='" + line +
                '}';
    }
}


-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: How to correct use timeWindow() with DataStream?

Posted by Felipe Gutierrez <fe...@gmail.com>.
thanks,

I did using ".timeWindowAll(Time.seconds(5), Time.seconds(1)).apply(new
LogLineAllWindowFunction());" My output is filtering only tha values inside
the window.

thanks, Felipe



On Mon, Mar 19, 2018 at 10:54 AM, Fabian Hueske <fh...@gmail.com> wrote:

> If you don't want to partition by key, i.e., have a single result for each
> time window, you should not use keyBy and an allWindow.
> However, this will only be executed with a parallelism of 1.
>
> 2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <fe...@gmail.com>
> :
>
>> thanks a lot Fabian,
>>
>> It clarified my way to developing. I am using keyBy, timeWindow, and
>> apply monad operator at the EventTimeStreamExampleJava
>> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java>
>> now. I am generating dates in order and with a bit out of orderness now at
>> LogSourceFunction
>> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>.
>> And only using Date as my key at LogLine
>> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java>
>> object.
>>
>> If I understood watermarks well, my program should combine all the lines
>> that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
>> Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
>> is still not happening because I didn't use a good key ".keyBy(lineLog ->
>> lineLog.getTime())" and my key at the LogLineCounterFunction class is still
>> the Date.
>>
>> public static class LogLineCounterFunction implements WindowFunction<
>>         LogLine, // input
>>         Tuple3<LogLine, Long, Integer>, // output
>>         Date, // key
>>         TimeWindow> { // window type
>>
>> What should I use as a key in my case?
>>
>> My output is combining only the lines with the same key (Date). I want to
>> combine the dates between the watermarks ".timeWindow(Time.seconds(5),
>> Time.seconds(1))"...
>>
>> 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15
>> 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
>> 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534
>> | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
>> 16:31:08.534},1071516670000,9)
>> 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15
>> 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 |
>> 2003-12-15 16:31:04.184},1071516670000,4)
>> 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15
>> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
>> 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
>> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
>> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
>> 2003-12-15 16:31:00.884},1071516670000,12)
>> 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
>> 16:31:03.784},1071516670000,1)
>> 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15
>> 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 |
>> 2003-12-15 16:31:06.334},1071516670000,4)
>>
>>
>>
>>
>> On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The timestamps of the stream records should be increasing (strict
>>> monotonicity is not required, a bit out of orderness can be handled due to
>>> watermarks).
>>> So, the events should also be generated with increasing timestamps. It
>>> looks like your generator generates random dates. I'd also generate data
>>> with millisecond precision, not just days.
>>>
>>> Also, a timestamp in Flink is the number of milliseconds since
>>> 1970-01-01-00:00:00.
>>> However, your timestamp extractor only returns the number of seconds
>>> since last minute (i.e., from 0 to 60). You should use Date.getTime()
>>> instead of Date.getSeconds().
>>>
>>> Best, Fabian
>>>
>>> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <
>>> felipe.o.gutierrez@gmail.com>:
>>>
>>>> Hi all,
>>>>
>>>> I am building an example with DataStream using Flink that has a fake
>>>> source generator of LogLine(Date d, String line). I want to work with
>>>> Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks.
>>>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>>>> I can group by second and concatenate the lines. When I use
>>>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>>>> misunderstood something when I was reading about Event Time. Could anyone
>>>> help me please? My source code is as follow.
>>>>
>>>> Thanks for the ideas. Kind Regards,  Felipe
>>>>
>>>> package flink.example.streaming;
>>>>
>>>> import flink.util.LogLine;
>>>> import flink.util.LogSourceFunction;
>>>> import flink.util.UtilDate;
>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>>> vironment;
>>>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>>>> cWatermarks;
>>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>
>>>> import javax.annotation.Nullable;
>>>> import java.util.Date;
>>>>
>>>> public class EventTimeStreamExampleJava {
>>>>     public static void main(String[] args) throws Exception {
>>>>
>>>>         final StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>
>>>>         DataStream<LogLine> dataStream = env
>>>>                 .addSource(new LogSourceFunction())
>>>>                 .assignTimestampsAndWatermarks(new
>>>> BoundedOutOfOrdernessGenerator())
>>>>                 .keyBy(lineLog -> lineLog.getSec())
>>>>                 // .timeWindow(Time.seconds(2))
>>>>                 .reduce((log1, log2) -> new LogLine(log1.getTime(),
>>>> log1.getLine() + " | " + log2.getLine()))
>>>>                 ;
>>>>
>>>>         dataStream.print();
>>>>
>>>>         env.execute("Window LogRead");
>>>>     }
>>>>
>>>>     public static class BoundedOutOfOrdernessGenerator implements
>>>> AssignerWithPeriodicWatermarks<LogLine> {
>>>>
>>>>         private final long maxOutOfOrderness = 3500; // 3.5 seconds
>>>>
>>>>         private long currentMaxTimestamp;
>>>>
>>>>         @Override
>>>>         public long extractTimestamp(LogLine element, long
>>>> previousElementTimestamp) {
>>>>             long timestamp = element.getTime().getSeconds();
>>>>             currentMaxTimestamp = Math.max(timestamp,
>>>> currentMaxTimestamp);
>>>>             return timestamp;
>>>>         }
>>>>
>>>>         @Nullable
>>>>         @Override
>>>>         public Watermark getCurrentWatermark() {
>>>>             // return the watermark as current highest timestamp minus
>>>> the out-of-orderness bound
>>>>             return new Watermark(currentMaxTimestamp -
>>>> maxOutOfOrderness);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> package flink.util;
>>>>
>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>>
>>>> public class LogSourceFunction implements SourceFunction<LogLine> {
>>>>
>>>>     private volatile boolean isRunning = true;
>>>>
>>>>     @Override
>>>>     public void run(SourceContext<LogLine> ctx) throws Exception {
>>>>         while (isRunning) {
>>>>             ctx.collect(new LogLine(UtilDate.getRandomSec(),
>>>> UtilDate.getRandomString()));
>>>>         }
>>>>     }
>>>>
>>>>     @Override
>>>>     public void cancel() {
>>>>         isRunning = false;
>>>>     }
>>>> }
>>>>
>>>> package flink.util;
>>>>
>>>> import java.util.Date;
>>>> import java.util.Objects;
>>>>
>>>> public class LogLine {
>>>>
>>>>     private Date time;
>>>>     private int sec;
>>>>     private String line;
>>>>
>>>>     public LogLine() {
>>>>     }
>>>>
>>>>     public LogLine(Date time, String line) {
>>>>         this.sec = time.getSeconds();
>>>>         this.time = time;
>>>>         this.line = line;
>>>>     }
>>>>
>>>>     public LogLine(int sec, String line) {
>>>>         this.sec = sec;
>>>>         this.time = UtilDate.getRandomDate(sec);
>>>>         this.line = line;
>>>>     }
>>>>
>>>>     public int getSec() {
>>>>         return sec;
>>>>     }
>>>>
>>>>     public void setSec(int sec) {
>>>>         this.sec = sec;
>>>>     }
>>>>
>>>>     public Date getTime() {
>>>>         return time;
>>>>     }
>>>>
>>>>     public String getLine() {
>>>>         return line;
>>>>     }
>>>>
>>>>     public void setTime(Date time) {
>>>>         this.time = time;
>>>>     }
>>>>
>>>>     public void setLine(String line) {
>>>>         this.line = line;
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean equals(Object o) {
>>>>         if (this == o) return true;
>>>>         if (o == null || getClass() != o.getClass()) return false;
>>>>         LogLine logLine = (LogLine) o;
>>>>         return Objects.equals(time, logLine.time) &&
>>>>                 Objects.equals(sec, logLine.sec) &&
>>>>                 Objects.equals(line, logLine.line);
>>>>     }
>>>>
>>>>     @Override
>>>>     public int hashCode() {
>>>>
>>>>         return Objects.hash(time, sec, line);
>>>>     }
>>>>
>>>>     @Override
>>>>     public String toString() {
>>>>         return "LogLine{" +
>>>>                 "time=" + time +
>>>>                 ", sec=" + sec +
>>>>                 ", line='" + line +
>>>>                 '}';
>>>>     }
>>>> }
>>>>
>>>>
>>>> --
>>>>
>>>> *---- Felipe Oliveira Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>
>>>
>>
>>
>> --
>>
>> *---- Felipe Oliveira Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>
>


-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: How to correct use timeWindow() with DataStream?

Posted by Fabian Hueske <fh...@gmail.com>.
If you don't want to partition by key, i.e., have a single result for each
time window, you should not use keyBy and an allWindow.
However, this will only be executed with a parallelism of 1.

2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <fe...@gmail.com>:

> thanks a lot Fabian,
>
> It clarified my way to developing. I am using keyBy, timeWindow, and apply
> monad operator at the EventTimeStreamExampleJava
> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java>
> now. I am generating dates in order and with a bit out of orderness now at
> LogSourceFunction
> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>.
> And only using Date as my key at LogLine
> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java>
> object.
>
> If I understood watermarks well, my program should combine all the lines
> that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
> Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
> is still not happening because I didn't use a good key ".keyBy(lineLog ->
> lineLog.getTime())" and my key at the LogLineCounterFunction class is still
> the Date.
>
> public static class LogLineCounterFunction implements WindowFunction<
>         LogLine, // input
>         Tuple3<LogLine, Long, Integer>, // output
>         Date, // key
>         TimeWindow> { // window type
>
> What should I use as a key in my case?
>
> My output is combining only the lines with the same key (Date). I want to
> combine the dates between the watermarks ".timeWindow(Time.seconds(5),
> Time.seconds(1))"...
>
> 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534
> | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
> 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
> 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
> 16:31:08.534},1071516670000,9)
> 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184
> | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15
> 16:31:04.184},1071516670000,4)
> 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884
> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
> 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
> 16:31:00.884},1071516670000,12)
> 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
> 16:31:03.784},1071516670000,1)
> 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334
> | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15
> 16:31:06.334},1071516670000,4)
>
>
>
>
> On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> The timestamps of the stream records should be increasing (strict
>> monotonicity is not required, a bit out of orderness can be handled due to
>> watermarks).
>> So, the events should also be generated with increasing timestamps. It
>> looks like your generator generates random dates. I'd also generate data
>> with millisecond precision, not just days.
>>
>> Also, a timestamp in Flink is the number of milliseconds since
>> 1970-01-01-00:00:00.
>> However, your timestamp extractor only returns the number of seconds
>> since last minute (i.e., from 0 to 60). You should use Date.getTime()
>> instead of Date.getSeconds().
>>
>> Best, Fabian
>>
>> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <felipe.o.gutierrez@gmail.com
>> >:
>>
>>> Hi all,
>>>
>>> I am building an example with DataStream using Flink that has a fake
>>> source generator of LogLine(Date d, String line). I want to work with
>>> Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks.
>>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>>> I can group by second and concatenate the lines. When I use
>>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>>> misunderstood something when I was reading about Event Time. Could anyone
>>> help me please? My source code is as follow.
>>>
>>> Thanks for the ideas. Kind Regards,  Felipe
>>>
>>> package flink.example.streaming;
>>>
>>> import flink.util.LogLine;
>>> import flink.util.LogSourceFunction;
>>> import flink.util.UtilDate;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>> vironment;
>>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>>> cWatermarks;
>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>
>>> import javax.annotation.Nullable;
>>> import java.util.Date;
>>>
>>> public class EventTimeStreamExampleJava {
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>>         DataStream<LogLine> dataStream = env
>>>                 .addSource(new LogSourceFunction())
>>>                 .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessGenerator())
>>>                 .keyBy(lineLog -> lineLog.getSec())
>>>                 // .timeWindow(Time.seconds(2))
>>>                 .reduce((log1, log2) -> new LogLine(log1.getTime(),
>>> log1.getLine() + " | " + log2.getLine()))
>>>                 ;
>>>
>>>         dataStream.print();
>>>
>>>         env.execute("Window LogRead");
>>>     }
>>>
>>>     public static class BoundedOutOfOrdernessGenerator implements
>>> AssignerWithPeriodicWatermarks<LogLine> {
>>>
>>>         private final long maxOutOfOrderness = 3500; // 3.5 seconds
>>>
>>>         private long currentMaxTimestamp;
>>>
>>>         @Override
>>>         public long extractTimestamp(LogLine element, long
>>> previousElementTimestamp) {
>>>             long timestamp = element.getTime().getSeconds();
>>>             currentMaxTimestamp = Math.max(timestamp,
>>> currentMaxTimestamp);
>>>             return timestamp;
>>>         }
>>>
>>>         @Nullable
>>>         @Override
>>>         public Watermark getCurrentWatermark() {
>>>             // return the watermark as current highest timestamp minus
>>> the out-of-orderness bound
>>>             return new Watermark(currentMaxTimestamp -
>>> maxOutOfOrderness);
>>>         }
>>>     }
>>> }
>>>
>>> package flink.util;
>>>
>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>
>>> public class LogSourceFunction implements SourceFunction<LogLine> {
>>>
>>>     private volatile boolean isRunning = true;
>>>
>>>     @Override
>>>     public void run(SourceContext<LogLine> ctx) throws Exception {
>>>         while (isRunning) {
>>>             ctx.collect(new LogLine(UtilDate.getRandomSec(),
>>> UtilDate.getRandomString()));
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public void cancel() {
>>>         isRunning = false;
>>>     }
>>> }
>>>
>>> package flink.util;
>>>
>>> import java.util.Date;
>>> import java.util.Objects;
>>>
>>> public class LogLine {
>>>
>>>     private Date time;
>>>     private int sec;
>>>     private String line;
>>>
>>>     public LogLine() {
>>>     }
>>>
>>>     public LogLine(Date time, String line) {
>>>         this.sec = time.getSeconds();
>>>         this.time = time;
>>>         this.line = line;
>>>     }
>>>
>>>     public LogLine(int sec, String line) {
>>>         this.sec = sec;
>>>         this.time = UtilDate.getRandomDate(sec);
>>>         this.line = line;
>>>     }
>>>
>>>     public int getSec() {
>>>         return sec;
>>>     }
>>>
>>>     public void setSec(int sec) {
>>>         this.sec = sec;
>>>     }
>>>
>>>     public Date getTime() {
>>>         return time;
>>>     }
>>>
>>>     public String getLine() {
>>>         return line;
>>>     }
>>>
>>>     public void setTime(Date time) {
>>>         this.time = time;
>>>     }
>>>
>>>     public void setLine(String line) {
>>>         this.line = line;
>>>     }
>>>
>>>     @Override
>>>     public boolean equals(Object o) {
>>>         if (this == o) return true;
>>>         if (o == null || getClass() != o.getClass()) return false;
>>>         LogLine logLine = (LogLine) o;
>>>         return Objects.equals(time, logLine.time) &&
>>>                 Objects.equals(sec, logLine.sec) &&
>>>                 Objects.equals(line, logLine.line);
>>>     }
>>>
>>>     @Override
>>>     public int hashCode() {
>>>
>>>         return Objects.hash(time, sec, line);
>>>     }
>>>
>>>     @Override
>>>     public String toString() {
>>>         return "LogLine{" +
>>>                 "time=" + time +
>>>                 ", sec=" + sec +
>>>                 ", line='" + line +
>>>                 '}';
>>>     }
>>> }
>>>
>>>
>>> --
>>>
>>> *---- Felipe Oliveira Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>
>>
>
>
> --
>
> *---- Felipe Oliveira Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>

Re: How to correct use timeWindow() with DataStream?

Posted by Felipe Gutierrez <fe...@gmail.com>.
thanks a lot Fabian,

It clarified my way to developing. I am using keyBy, timeWindow, and apply
monad operator at the EventTimeStreamExampleJava
<https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java>
now. I am generating dates in order and with a bit out of orderness now at
LogSourceFunction
<https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>.
And only using Date as my key at LogLine
<https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java>
object.

If I understood watermarks well, my program should combine all the lines
that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
is still not happening because I didn't use a good key ".keyBy(lineLog ->
lineLog.getTime())" and my key at the LogLineCounterFunction class is still
the Date.

public static class LogLineCounterFunction implements WindowFunction<
        LogLine, // input
        Tuple3<LogLine, Long, Integer>, // output
        Date, // key
        TimeWindow> { // window type

What should I use as a key in my case?

My output is combining only the lines with the same key (Date). I want to
combine the dates between the watermarks ".timeWindow(Time.seconds(5),
Time.seconds(1))"...

3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534
| 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
16:31:08.534},1071516670000,9)
3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184
| 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15
16:31:04.184},1071516670000,4)
3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884
| 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
| 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
16:31:00.884},1071516670000,12)
3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
16:31:03.784},1071516670000,1)
3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334
| 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15
16:31:06.334},1071516670000,4)




On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> The timestamps of the stream records should be increasing (strict
> monotonicity is not required, a bit out of orderness can be handled due to
> watermarks).
> So, the events should also be generated with increasing timestamps. It
> looks like your generator generates random dates. I'd also generate data
> with millisecond precision, not just days.
>
> Also, a timestamp in Flink is the number of milliseconds since
> 1970-01-01-00:00:00.
> However, your timestamp extractor only returns the number of seconds since
> last minute (i.e., from 0 to 60). You should use Date.getTime() instead of
> Date.getSeconds().
>
> Best, Fabian
>
> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <fe...@gmail.com>
> :
>
>> Hi all,
>>
>> I am building an example with DataStream using Flink that has a fake
>> source generator of LogLine(Date d, String line). I want to work with
>> Watermarks on it so I created a class that implements AssignerWithPeriodicWatermarks.
>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>> I can group by second and concatenate the lines. When I use
>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>> misunderstood something when I was reading about Event Time. Could anyone
>> help me please? My source code is as follow.
>>
>> Thanks for the ideas. Kind Regards,  Felipe
>>
>> package flink.example.streaming;
>>
>> import flink.util.LogLine;
>> import flink.util.LogSourceFunction;
>> import flink.util.UtilDate;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>> cWatermarks;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>>
>> import javax.annotation.Nullable;
>> import java.util.Date;
>>
>> public class EventTimeStreamExampleJava {
>>     public static void main(String[] args) throws Exception {
>>
>>         final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>         DataStream<LogLine> dataStream = env
>>                 .addSource(new LogSourceFunction())
>>                 .assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessGenerator())
>>                 .keyBy(lineLog -> lineLog.getSec())
>>                 // .timeWindow(Time.seconds(2))
>>                 .reduce((log1, log2) -> new LogLine(log1.getTime(),
>> log1.getLine() + " | " + log2.getLine()))
>>                 ;
>>
>>         dataStream.print();
>>
>>         env.execute("Window LogRead");
>>     }
>>
>>     public static class BoundedOutOfOrdernessGenerator implements
>> AssignerWithPeriodicWatermarks<LogLine> {
>>
>>         private final long maxOutOfOrderness = 3500; // 3.5 seconds
>>
>>         private long currentMaxTimestamp;
>>
>>         @Override
>>         public long extractTimestamp(LogLine element, long
>> previousElementTimestamp) {
>>             long timestamp = element.getTime().getSeconds();
>>             currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>>             return timestamp;
>>         }
>>
>>         @Nullable
>>         @Override
>>         public Watermark getCurrentWatermark() {
>>             // return the watermark as current highest timestamp minus
>> the out-of-orderness bound
>>             return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>>         }
>>     }
>> }
>>
>> package flink.util;
>>
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>
>> public class LogSourceFunction implements SourceFunction<LogLine> {
>>
>>     private volatile boolean isRunning = true;
>>
>>     @Override
>>     public void run(SourceContext<LogLine> ctx) throws Exception {
>>         while (isRunning) {
>>             ctx.collect(new LogLine(UtilDate.getRandomSec(),
>> UtilDate.getRandomString()));
>>         }
>>     }
>>
>>     @Override
>>     public void cancel() {
>>         isRunning = false;
>>     }
>> }
>>
>> package flink.util;
>>
>> import java.util.Date;
>> import java.util.Objects;
>>
>> public class LogLine {
>>
>>     private Date time;
>>     private int sec;
>>     private String line;
>>
>>     public LogLine() {
>>     }
>>
>>     public LogLine(Date time, String line) {
>>         this.sec = time.getSeconds();
>>         this.time = time;
>>         this.line = line;
>>     }
>>
>>     public LogLine(int sec, String line) {
>>         this.sec = sec;
>>         this.time = UtilDate.getRandomDate(sec);
>>         this.line = line;
>>     }
>>
>>     public int getSec() {
>>         return sec;
>>     }
>>
>>     public void setSec(int sec) {
>>         this.sec = sec;
>>     }
>>
>>     public Date getTime() {
>>         return time;
>>     }
>>
>>     public String getLine() {
>>         return line;
>>     }
>>
>>     public void setTime(Date time) {
>>         this.time = time;
>>     }
>>
>>     public void setLine(String line) {
>>         this.line = line;
>>     }
>>
>>     @Override
>>     public boolean equals(Object o) {
>>         if (this == o) return true;
>>         if (o == null || getClass() != o.getClass()) return false;
>>         LogLine logLine = (LogLine) o;
>>         return Objects.equals(time, logLine.time) &&
>>                 Objects.equals(sec, logLine.sec) &&
>>                 Objects.equals(line, logLine.line);
>>     }
>>
>>     @Override
>>     public int hashCode() {
>>
>>         return Objects.hash(time, sec, line);
>>     }
>>
>>     @Override
>>     public String toString() {
>>         return "LogLine{" +
>>                 "time=" + time +
>>                 ", sec=" + sec +
>>                 ", line='" + line +
>>                 '}';
>>     }
>> }
>>
>>
>> --
>>
>> *---- Felipe Oliveira Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>
>


-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: How to correct use timeWindow() with DataStream?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

The timestamps of the stream records should be increasing (strict
monotonicity is not required, a bit out of orderness can be handled due to
watermarks).
So, the events should also be generated with increasing timestamps. It
looks like your generator generates random dates. I'd also generate data
with millisecond precision, not just days.

Also, a timestamp in Flink is the number of milliseconds since
1970-01-01-00:00:00.
However, your timestamp extractor only returns the number of seconds since
last minute (i.e., from 0 to 60). You should use Date.getTime() instead of
Date.getSeconds().

Best, Fabian

2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <fe...@gmail.com>:

> Hi all,
>
> I am building an example with DataStream using Flink that has a fake
> source generator of LogLine(Date d, String line). I want to work with
> Watermarks on it so I created a class that implements
> AssignerWithPeriodicWatermarks. If I don't use the monad
> ".timeWindow(Time.seconds(2))" on the data stream I can group by second and
> concatenate the lines. When I use ".timeWindow(Time.seconds(2))" nothing is
> shown on the output. I guess I misunderstood something when I was reading
> about Event Time. Could anyone help me please? My source code is as follow.
>
> Thanks for the ideas. Kind Regards,  Felipe
>
> package flink.example.streaming;
>
> import flink.util.LogLine;
> import flink.util.LogSourceFunction;
> import flink.util.UtilDate;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.
> AssignerWithPeriodicWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.streaming.api.windowing.time.Time;
>
> import javax.annotation.Nullable;
> import java.util.Date;
>
> public class EventTimeStreamExampleJava {
>     public static void main(String[] args) throws Exception {
>
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         DataStream<LogLine> dataStream = env
>                 .addSource(new LogSourceFunction())
>                 .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator())
>                 .keyBy(lineLog -> lineLog.getSec())
>                 // .timeWindow(Time.seconds(2))
>                 .reduce((log1, log2) -> new LogLine(log1.getTime(),
> log1.getLine() + " | " + log2.getLine()))
>                 ;
>
>         dataStream.print();
>
>         env.execute("Window LogRead");
>     }
>
>     public static class BoundedOutOfOrdernessGenerator implements
> AssignerWithPeriodicWatermarks<LogLine> {
>
>         private final long maxOutOfOrderness = 3500; // 3.5 seconds
>
>         private long currentMaxTimestamp;
>
>         @Override
>         public long extractTimestamp(LogLine element, long
> previousElementTimestamp) {
>             long timestamp = element.getTime().getSeconds();
>             currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>             return timestamp;
>         }
>
>         @Nullable
>         @Override
>         public Watermark getCurrentWatermark() {
>             // return the watermark as current highest timestamp minus the
> out-of-orderness bound
>             return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>         }
>     }
> }
>
> package flink.util;
>
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> public class LogSourceFunction implements SourceFunction<LogLine> {
>
>     private volatile boolean isRunning = true;
>
>     @Override
>     public void run(SourceContext<LogLine> ctx) throws Exception {
>         while (isRunning) {
>             ctx.collect(new LogLine(UtilDate.getRandomSec(),
> UtilDate.getRandomString()));
>         }
>     }
>
>     @Override
>     public void cancel() {
>         isRunning = false;
>     }
> }
>
> package flink.util;
>
> import java.util.Date;
> import java.util.Objects;
>
> public class LogLine {
>
>     private Date time;
>     private int sec;
>     private String line;
>
>     public LogLine() {
>     }
>
>     public LogLine(Date time, String line) {
>         this.sec = time.getSeconds();
>         this.time = time;
>         this.line = line;
>     }
>
>     public LogLine(int sec, String line) {
>         this.sec = sec;
>         this.time = UtilDate.getRandomDate(sec);
>         this.line = line;
>     }
>
>     public int getSec() {
>         return sec;
>     }
>
>     public void setSec(int sec) {
>         this.sec = sec;
>     }
>
>     public Date getTime() {
>         return time;
>     }
>
>     public String getLine() {
>         return line;
>     }
>
>     public void setTime(Date time) {
>         this.time = time;
>     }
>
>     public void setLine(String line) {
>         this.line = line;
>     }
>
>     @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (o == null || getClass() != o.getClass()) return false;
>         LogLine logLine = (LogLine) o;
>         return Objects.equals(time, logLine.time) &&
>                 Objects.equals(sec, logLine.sec) &&
>                 Objects.equals(line, logLine.line);
>     }
>
>     @Override
>     public int hashCode() {
>
>         return Objects.hash(time, sec, line);
>     }
>
>     @Override
>     public String toString() {
>         return "LogLine{" +
>                 "time=" + time +
>                 ", sec=" + sec +
>                 ", line='" + line +
>                 '}';
>     }
> }
>
>
> --
>
> *---- Felipe Oliveira Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>