You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by dim5b <dm...@gmail.com> on 2018/03/12 13:37:34 UTC

Global Window, Trigger and Watermarks, Parallelism

Could someone clarify how exactly event time/watermarks and allow lateness
work. I have created the program below and I have an input file such as...

  device_id,trigger_id,event_time,messageId
    1,START,1520433909396,1
    1,TRACKING,1520433914398,2
    1,TRACKING,1520433919398,3
    1,STOP,1520433924398,4
    1,START,1520433929398,5
    1,TRACKING,1520433934399,6
    1,TRACKING,1520433939399,7
    1,TRACKING,1520433944399,8
    1,STOP,1520433949399,9

Where trigger_id can be an indicator such as: start,tracking,stop. What I
would like to do is based on device_id group all incoming events and define
a window based on the trigger_id. I.e group all events from start until stop
and then do some calculations such as: average,max etc.

I call  env.readTextFile("events.csv"); and set StreamTimeCharacteristic to
EventTime. Parellism is set to 4. This means my messages from the source
file are read but are not in order... (I have added messageId as an counter
only for dev purposes)

I have defined a custom trigger to find stop events and fire in order to
evict all collected events. My main problem is that if parellism is
increased from 1 the input source reads these out of order.

Shouldn't event time and watermarks resolve this issue? How do i handle
possible out of order events?

public class SimpleEventJob {

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(4);
        //env.setParallelism(1);
		DataStream<String> input = env.readTextFile("events.csv");
		// create event stream
		DataStream<Event> events = input.map(new LineToEvent());
		DataStream<Event> waterMarkedStreams =
events.assignTimestampsAndWatermarks(new EventWAssigner());
		DataStream<TripEv> tripStream = waterMarkedStreams.keyBy("deviceId")
				.window(GlobalWindows.create())
				.trigger(new TripTrigger())
				.evictor(new TripEvictor())
				.apply(new CreateTrip());
		tripStream.print();
        // execute program
		env.execute("Flink Streaming Java API Skeleton");
	}
	
    public static class TripTrigger extends Trigger<Event, GlobalWindow> {
        @Override
        public TriggerResult onElement(Event event, long timestamp,
GlobalWindow window, TriggerContext context) throws Exception {
            // if this is a stop event, set a timer
            if (event.getTrigger() == 53) {
				return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }

		@Override
		public TriggerResult onEventTime(long time, GlobalWindow window,
TriggerContext ctx) {
            return TriggerResult.FIRE;
		}

		@Override
		public TriggerResult onProcessingTime(long time, GlobalWindow window,
TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public void clear(GlobalWindow window, TriggerContext ctx) {
		}
    }

    private static class TripEvictor implements Evictor<Event, GlobalWindow>
{
		@Override
		public void evictBefore(Iterable<TimestampedValue&lt;Event>> events,
		int size, GlobalWindow window, EvictorContext ctx) {
		}

		@Override
		public void evictAfter(Iterable<TimestampedValue&lt;Event>> elements, int
size, GlobalWindow window, EvictorContext
		ctx) {
			System.out.println(elements);
			long firstStop = Event.earliestStopElement(elements);
			// remove all events up to (and including) the first stop event (which is
the event that triggered the window)
			for (Iterator<TimestampedValue&lt;Event>> iterator = elements.iterator();
iterator.hasNext(); ) {
				TimestampedValue<Event> element = iterator.next();
				if (element.getTimestamp() >= firstStop ) {
					iterator.remove();
				}
			}
		}
	}

	public static class CreateTrip implements WindowFunction<Event, TripEv,
Tuple, GlobalWindow> {
		@Override
		public void apply(Tuple key, GlobalWindow window, Iterable<Event> events,
Collector<TripEv> out) {
			TripEv trp = new TripEv(events);
			if (trp.length > 0) {
				out.collect(trp);
			}
		}
	}

	private static class LineToEvent implements MapFunction<String, Event> {
		@Override
		public Event map(String line) throws Exception {
			return Event.fromString(line);
		}
	}


	private static class EventWAssigner implements
AssignerWithPunctuatedWatermarks<Event> {
		@Override
		public long extractTimestamp(Event event, long previousElementTimestamp) {
			return event.getTimestamp();
		}

		@Override
		public Watermark checkAndGetNextWatermark(Event event, long
extractedTimestamp) {
			// simply emit a watermark with every event
			return new Watermark(extractedTimestamp - 1000L);
		}
	}
}







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Global Window, Trigger and Watermarks, Parallelism

Posted by dim5b <dm...@gmail.com>.
By adding , AfterMatchSkipStrategy.skipPastLastEvent() it returns what i
want.

Is there a way to track/emit "ongoing" events i.e before the pattern matchs
the end event type?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Global Window, Trigger and Watermarks, Parallelism

Posted by dim5b <dm...@gmail.com>.
I see you replied on 

https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching

with a known bug issue on the 

https://issues.apache.org/jira/browse/FLINK-8914

In my case my pattern looks like 

Pattern<Event, Event> tripPattern =
               
Pattern.<Event>begin("start").times(1).where(START_CONDITION)
                       
.followedBy("middle").where(MIDDLE_CONDITION).oneOrMore()
                        .next("end").where(END_CONDITION);

the end result being..

1> 1,2,3,4,
4> 1,2,3,6,7,8,11,12,
1> 5,6,7,8,11,12,
3> 5,6,7,8,9,
2> 1,2,3,6,7,8,9,
2> 10,11,12,

By using next operator  I limit the end-terminating side but begin takes all
possible solutions.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Global Window, Trigger and Watermarks, Parallelism

Posted by dim5b <dm...@gmail.com>.
I have looked into the CEP library. I have posted  an issued on
stackoverflow.

https://stackoverflow.com/questions/49047879/global-windows-in-flink-using-custom-triggers-vs-flink-cep-pattern-api

However the pattern matches all possible solution on the stream of
events.Does pattern have a notion of Evictor? How can you keep only the
specific set of events.

This question has not been answered and there seems to be a similar question 

https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Global Window, Trigger and Watermarks, Parallelism

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

Event-time and watermarks can be used to deal with out-of-order events, 
but since you're using global windows (opposed to time-based windows) 
you have to implement the logic for doing this yourself.

Conceptually, what you would have to do is to not create your TripEv 
when receiving a STOP event, but when a given amount of time has passed 
after you received it. Your CreateTrip function would have to scan the 
window contents for START -> STOP sequences and check the timestamp of 
STOP with the current event time. (I think for this you would have to 
extend ProcessWindowFunction instead)
The evictor would have to use the same logic to detect sequences that 
were already processed.

I suggest to look into our CEP 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html> 
library as this looks like a perfect use-case for it.

On 12.03.2018 14:37, dim5b wrote:
> Could someone clarify how exactly event time/watermarks and allow lateness
> work. I have created the program below and I have an input file such as...
>
>    device_id,trigger_id,event_time,messageId
>      1,START,1520433909396,1
>      1,TRACKING,1520433914398,2
>      1,TRACKING,1520433919398,3
>      1,STOP,1520433924398,4
>      1,START,1520433929398,5
>      1,TRACKING,1520433934399,6
>      1,TRACKING,1520433939399,7
>      1,TRACKING,1520433944399,8
>      1,STOP,1520433949399,9
>
> Where trigger_id can be an indicator such as: start,tracking,stop. What I
> would like to do is based on device_id group all incoming events and define
> a window based on the trigger_id. I.e group all events from start until stop
> and then do some calculations such as: average,max etc.
>
> I call  env.readTextFile("events.csv"); and set StreamTimeCharacteristic to
> EventTime. Parellism is set to 4. This means my messages from the source
> file are read but are not in order... (I have added messageId as an counter
> only for dev purposes)
>
> I have defined a custom trigger to find stop events and fire in order to
> evict all collected events. My main problem is that if parellism is
> increased from 1 the input source reads these out of order.
>
> Shouldn't event time and watermarks resolve this issue? How do i handle
> possible out of order events?
>
> public class SimpleEventJob {
>
> 	public static void main(String[] args) throws Exception {
> 		// set up the streaming execution environment
> 		final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 		env.setParallelism(4);
>          //env.setParallelism(1);
> 		DataStream<String> input = env.readTextFile("events.csv");
> 		// create event stream
> 		DataStream<Event> events = input.map(new LineToEvent());
> 		DataStream<Event> waterMarkedStreams =
> events.assignTimestampsAndWatermarks(new EventWAssigner());
> 		DataStream<TripEv> tripStream = waterMarkedStreams.keyBy("deviceId")
> 				.window(GlobalWindows.create())
> 				.trigger(new TripTrigger())
> 				.evictor(new TripEvictor())
> 				.apply(new CreateTrip());
> 		tripStream.print();
>          // execute program
> 		env.execute("Flink Streaming Java API Skeleton");
> 	}
> 	
>      public static class TripTrigger extends Trigger<Event, GlobalWindow> {
>          @Override
>          public TriggerResult onElement(Event event, long timestamp,
> GlobalWindow window, TriggerContext context) throws Exception {
>              // if this is a stop event, set a timer
>              if (event.getTrigger() == 53) {
> 				return TriggerResult.FIRE;
>              }
>              return TriggerResult.CONTINUE;
>          }
>
> 		@Override
> 		public TriggerResult onEventTime(long time, GlobalWindow window,
> TriggerContext ctx) {
>              return TriggerResult.FIRE;
> 		}
>
> 		@Override
> 		public TriggerResult onProcessingTime(long time, GlobalWindow window,
> TriggerContext ctx) {
> 			return TriggerResult.CONTINUE;
> 		}
>
> 		@Override
> 		public void clear(GlobalWindow window, TriggerContext ctx) {
> 		}
>      }
>
>      private static class TripEvictor implements Evictor<Event, GlobalWindow>
> {
> 		@Override
> 		public void evictBefore(Iterable<TimestampedValue&lt;Event>> events,
> 		int size, GlobalWindow window, EvictorContext ctx) {
> 		}
>
> 		@Override
> 		public void evictAfter(Iterable<TimestampedValue&lt;Event>> elements, int
> size, GlobalWindow window, EvictorContext
> 		ctx) {
> 			System.out.println(elements);
> 			long firstStop = Event.earliestStopElement(elements);
> 			// remove all events up to (and including) the first stop event (which is
> the event that triggered the window)
> 			for (Iterator<TimestampedValue&lt;Event>> iterator = elements.iterator();
> iterator.hasNext(); ) {
> 				TimestampedValue<Event> element = iterator.next();
> 				if (element.getTimestamp() >= firstStop ) {
> 					iterator.remove();
> 				}
> 			}
> 		}
> 	}
>
> 	public static class CreateTrip implements WindowFunction<Event, TripEv,
> Tuple, GlobalWindow> {
> 		@Override
> 		public void apply(Tuple key, GlobalWindow window, Iterable<Event> events,
> Collector<TripEv> out) {
> 			TripEv trp = new TripEv(events);
> 			if (trp.length > 0) {
> 				out.collect(trp);
> 			}
> 		}
> 	}
>
> 	private static class LineToEvent implements MapFunction<String, Event> {
> 		@Override
> 		public Event map(String line) throws Exception {
> 			return Event.fromString(line);
> 		}
> 	}
>
>
> 	private static class EventWAssigner implements
> AssignerWithPunctuatedWatermarks<Event> {
> 		@Override
> 		public long extractTimestamp(Event event, long previousElementTimestamp) {
> 			return event.getTimestamp();
> 		}
>
> 		@Override
> 		public Watermark checkAndGetNextWatermark(Event event, long
> extractedTimestamp) {
> 			// simply emit a watermark with every event
> 			return new Watermark(extractedTimestamp - 1000L);
> 		}
> 	}
> }
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>