You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Nam-Luc Tran <na...@euranova.eu> on 2016/02/25 18:14:39 UTC

Playing with EventTime in DataStreams

Hello everyone,

I am currently playing with streams which timestamp is defined by
EventTime. I currently have the following code:

      final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
      env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

      DataStream<String> input = env.readTextFile("file:///var/log/syslog");
      input.assignTimestampsAndWatermarks(new AssignTimestampFromLogEvent());

      input.timeWindowAll(Time.minutes(5)).apply(new
AllWindowFunction<Iterable<String>, String, TimeWindow>() {
         @Override
         public void apply(TimeWindow window, Iterable<String> values,
Collector<String> out) throws Exception {
            for(String t:values) {
               out.collect(t);
            }
         }
      }).print();

(...)

public static final class AssignTimestampFromLogEvent extends
AscendingTimestampExtractor<String> {
   @Override
   public long extractAscendingTimestamp(String element, long
previousElementTimestamp){
      String date = element.substring(0,15);
      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
      Date ddate = null;
      try {
         ddate = sdf.parse(date);
      } catch (ParseException e) {
         e.printStackTrace();
      }
      return ddate.getTime();
   }
}


What I expect it to do is to read the syslog, assign timestamp and do
5 minutes windows *based on the syslog event time*, as I've configured
the stream to do it. It however does not do that, and does the windows
based on processing time.

What am I missing here?

Best regards,

-- 

*Nam-Luc TRAN*

R&D Manager

EURA NOVA

(M) +32 498 37 36 23

*euranova.eu <http://euranova.eu>*

Re: Playing with EventTime in DataStreams

Posted by Stephan Ewen <se...@apache.org>.
Nice catch, actually.

I think we should let the timestamp extracting operator emit the current
watermark prior to shutting down.

On Fri, Feb 26, 2016 at 11:49 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I think the problem is that the source finished before the extractor has
> the chance to emit even a single watermark. This means that the topology
> will shut down and the window operator does not emit in-flight windows upon
> shutdown.
>
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 11:40, Nam-Luc Tran <na...@euranova.eu> wrote:
> >
> > Great, that did it, thanks Robert ;)
> >
> > While I'm at it:
> > Sometimes results are correctly returned, sometimes, the output of the
> job
> > (print or writeAsText)  is plain empty, like the job finished too quickly
> > before the results are written. One way of "forcing" results to happen is
> > to insert a "delay" in the source stream, as with a FlatMap:
> >
> >      @Override
> >      public void flatMap(String value, Collector<String> out)
> >            throws Exception {
> >         Thread.sleep(1);
> >         out.collect(value.toLowerCase());
> >         }
> >
> > Am I missing anything here?
> >
> > Best regards,
> >
> >
> > 2016-02-25 20:05 GMT+01:00 Robert Metzger <rm...@apache.org>:
> >
> >> Hi,
> >>
> >> I had a similar issue recently.
> >> Instead of
> >> input.assignTimestampsAndWatermarks
> >>
> >> you have to do:
> >>
> >> input = input.assignTimestampsAndWatermarks
> >>
> >> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <na...@euranova.eu>
> >> wrote:
> >>
> >>> Hello everyone,
> >>>
> >>> I am currently playing with streams which timestamp is defined by
> >>> EventTime. I currently have the following code:
> >>>
> >>>      final StreamExecutionEnvironment env =
> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>>
> >>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
> >>>      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>>
> >>>      DataStream<String> input =
> >>> env.readTextFile("file:///var/log/syslog");
> >>>      input.assignTimestampsAndWatermarks(new
> >>> AssignTimestampFromLogEvent());
> >>>
> >>>      input.timeWindowAll(Time.minutes(5)).apply(new
> >>> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
> >>>         @Override
> >>>         public void apply(TimeWindow window, Iterable<String> values,
> >>> Collector<String> out) throws Exception {
> >>>            for(String t:values) {
> >>>               out.collect(t);
> >>>            }
> >>>         }
> >>>      }).print();
> >>>
> >>> (...)
> >>>
> >>> public static final class AssignTimestampFromLogEvent extends
> >>> AscendingTimestampExtractor<String> {
> >>>   @Override
> >>>   public long extractAscendingTimestamp(String element, long
> >>> previousElementTimestamp){
> >>>      String date = element.substring(0,15);
> >>>      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
> >>>      Date ddate = null;
> >>>      try {
> >>>         ddate = sdf.parse(date);
> >>>      } catch (ParseException e) {
> >>>         e.printStackTrace();
> >>>      }
> >>>      return ddate.getTime();
> >>>   }
> >>> }
> >>>
> >>>
> >>> What I expect it to do is to read the syslog, assign timestamp and do
> >>> 5 minutes windows *based on the syslog event time*, as I've configured
> >>> the stream to do it. It however does not do that, and does the windows
> >>> based on processing time.
> >>>
> >>> What am I missing here?
> >>>
> >>> Best regards,
> >>>
> >>> --
> >>>
> >>> *Nam-Luc TRAN*
> >>>
> >>> R&D Manager
> >>>
> >>> EURA NOVA
> >>>
> >>> (M) +32 498 37 36 23
> >>>
> >>> *euranova.eu <http://euranova.eu>*
> >>>
> >>
> >
> >
> >
> > --
> >
> > *Nam-Luc TRAN*
> >
> > R&D Manager
> >
> > EURA NOVA
> >
> > (M) +32 498 37 36 23
> >
> > *euranova.eu <http://euranova.eu>*
>
>

Re: Playing with EventTime in DataStreams

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think the problem is that the source finished before the extractor has the chance to emit even a single watermark. This means that the topology will shut down and the window operator does not emit in-flight windows upon shutdown.

Cheers,
Aljoscha
> On 26 Feb 2016, at 11:40, Nam-Luc Tran <na...@euranova.eu> wrote:
> 
> Great, that did it, thanks Robert ;)
> 
> While I'm at it:
> Sometimes results are correctly returned, sometimes, the output of the job
> (print or writeAsText)  is plain empty, like the job finished too quickly
> before the results are written. One way of "forcing" results to happen is
> to insert a "delay" in the source stream, as with a FlatMap:
> 
>      @Override
>      public void flatMap(String value, Collector<String> out)
>            throws Exception {
>         Thread.sleep(1);
>         out.collect(value.toLowerCase());
>         }
> 
> Am I missing anything here?
> 
> Best regards,
> 
> 
> 2016-02-25 20:05 GMT+01:00 Robert Metzger <rm...@apache.org>:
> 
>> Hi,
>> 
>> I had a similar issue recently.
>> Instead of
>> input.assignTimestampsAndWatermarks
>> 
>> you have to do:
>> 
>> input = input.assignTimestampsAndWatermarks
>> 
>> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <na...@euranova.eu>
>> wrote:
>> 
>>> Hello everyone,
>>> 
>>> I am currently playing with streams which timestamp is defined by
>>> EventTime. I currently have the following code:
>>> 
>>>      final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
>>>      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>>      DataStream<String> input =
>>> env.readTextFile("file:///var/log/syslog");
>>>      input.assignTimestampsAndWatermarks(new
>>> AssignTimestampFromLogEvent());
>>> 
>>>      input.timeWindowAll(Time.minutes(5)).apply(new
>>> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
>>>         @Override
>>>         public void apply(TimeWindow window, Iterable<String> values,
>>> Collector<String> out) throws Exception {
>>>            for(String t:values) {
>>>               out.collect(t);
>>>            }
>>>         }
>>>      }).print();
>>> 
>>> (...)
>>> 
>>> public static final class AssignTimestampFromLogEvent extends
>>> AscendingTimestampExtractor<String> {
>>>   @Override
>>>   public long extractAscendingTimestamp(String element, long
>>> previousElementTimestamp){
>>>      String date = element.substring(0,15);
>>>      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
>>>      Date ddate = null;
>>>      try {
>>>         ddate = sdf.parse(date);
>>>      } catch (ParseException e) {
>>>         e.printStackTrace();
>>>      }
>>>      return ddate.getTime();
>>>   }
>>> }
>>> 
>>> 
>>> What I expect it to do is to read the syslog, assign timestamp and do
>>> 5 minutes windows *based on the syslog event time*, as I've configured
>>> the stream to do it. It however does not do that, and does the windows
>>> based on processing time.
>>> 
>>> What am I missing here?
>>> 
>>> Best regards,
>>> 
>>> --
>>> 
>>> *Nam-Luc TRAN*
>>> 
>>> R&D Manager
>>> 
>>> EURA NOVA
>>> 
>>> (M) +32 498 37 36 23
>>> 
>>> *euranova.eu <http://euranova.eu>*
>>> 
>> 
> 
> 
> 
> -- 
> 
> *Nam-Luc TRAN*
> 
> R&D Manager
> 
> EURA NOVA
> 
> (M) +32 498 37 36 23
> 
> *euranova.eu <http://euranova.eu>*


Re: Playing with EventTime in DataStreams

Posted by Nam-Luc Tran <na...@euranova.eu>.
Great, that did it, thanks Robert ;)

While I'm at it:
Sometimes results are correctly returned, sometimes, the output of the job
(print or writeAsText)  is plain empty, like the job finished too quickly
before the results are written. One way of "forcing" results to happen is
to insert a "delay" in the source stream, as with a FlatMap:

      @Override
      public void flatMap(String value, Collector<String> out)
            throws Exception {
         Thread.sleep(1);
         out.collect(value.toLowerCase());
         }

Am I missing anything here?

Best regards,


2016-02-25 20:05 GMT+01:00 Robert Metzger <rm...@apache.org>:

> Hi,
>
> I had a similar issue recently.
> Instead of
>  input.assignTimestampsAndWatermarks
>
> you have to do:
>
>  input = input.assignTimestampsAndWatermarks
>
> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <na...@euranova.eu>
> wrote:
>
> > Hello everyone,
> >
> > I am currently playing with streams which timestamp is defined by
> > EventTime. I currently have the following code:
> >
> >       final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
> >       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >       DataStream<String> input =
> > env.readTextFile("file:///var/log/syslog");
> >       input.assignTimestampsAndWatermarks(new
> > AssignTimestampFromLogEvent());
> >
> >       input.timeWindowAll(Time.minutes(5)).apply(new
> > AllWindowFunction<Iterable<String>, String, TimeWindow>() {
> >          @Override
> >          public void apply(TimeWindow window, Iterable<String> values,
> > Collector<String> out) throws Exception {
> >             for(String t:values) {
> >                out.collect(t);
> >             }
> >          }
> >       }).print();
> >
> > (...)
> >
> > public static final class AssignTimestampFromLogEvent extends
> > AscendingTimestampExtractor<String> {
> >    @Override
> >    public long extractAscendingTimestamp(String element, long
> > previousElementTimestamp){
> >       String date = element.substring(0,15);
> >       SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
> >       Date ddate = null;
> >       try {
> >          ddate = sdf.parse(date);
> >       } catch (ParseException e) {
> >          e.printStackTrace();
> >       }
> >       return ddate.getTime();
> >    }
> > }
> >
> >
> > What I expect it to do is to read the syslog, assign timestamp and do
> > 5 minutes windows *based on the syslog event time*, as I've configured
> > the stream to do it. It however does not do that, and does the windows
> > based on processing time.
> >
> > What am I missing here?
> >
> > Best regards,
> >
> > --
> >
> > *Nam-Luc TRAN*
> >
> > R&D Manager
> >
> > EURA NOVA
> >
> > (M) +32 498 37 36 23
> >
> > *euranova.eu <http://euranova.eu>*
> >
>



-- 

*Nam-Luc TRAN*

R&D Manager

EURA NOVA

(M) +32 498 37 36 23

*euranova.eu <http://euranova.eu>*

Re: Playing with EventTime in DataStreams

Posted by Robert Metzger <rm...@apache.org>.
Hi,

I had a similar issue recently.
Instead of
 input.assignTimestampsAndWatermarks

you have to do:

 input = input.assignTimestampsAndWatermarks

On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <na...@euranova.eu>
wrote:

> Hello everyone,
>
> I am currently playing with streams which timestamp is defined by
> EventTime. I currently have the following code:
>
>       final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>       DataStream<String> input =
> env.readTextFile("file:///var/log/syslog");
>       input.assignTimestampsAndWatermarks(new
> AssignTimestampFromLogEvent());
>
>       input.timeWindowAll(Time.minutes(5)).apply(new
> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
>          @Override
>          public void apply(TimeWindow window, Iterable<String> values,
> Collector<String> out) throws Exception {
>             for(String t:values) {
>                out.collect(t);
>             }
>          }
>       }).print();
>
> (...)
>
> public static final class AssignTimestampFromLogEvent extends
> AscendingTimestampExtractor<String> {
>    @Override
>    public long extractAscendingTimestamp(String element, long
> previousElementTimestamp){
>       String date = element.substring(0,15);
>       SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
>       Date ddate = null;
>       try {
>          ddate = sdf.parse(date);
>       } catch (ParseException e) {
>          e.printStackTrace();
>       }
>       return ddate.getTime();
>    }
> }
>
>
> What I expect it to do is to read the syslog, assign timestamp and do
> 5 minutes windows *based on the syslog event time*, as I've configured
> the stream to do it. It however does not do that, and does the windows
> based on processing time.
>
> What am I missing here?
>
> Best regards,
>
> --
>
> *Nam-Luc TRAN*
>
> R&D Manager
>
> EURA NOVA
>
> (M) +32 498 37 36 23
>
> *euranova.eu <http://euranova.eu>*
>