You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by dim5b <dm...@gmail.com> on 2018/03/12 13:37:34 UTC
Global Window, Trigger and Watermarks, Parallelism
Could someone clarify how exactly event time/watermarks and allow lateness
work. I have created the program below and I have an input file such as...
device_id,trigger_id,event_time,messageId
1,START,1520433909396,1
1,TRACKING,1520433914398,2
1,TRACKING,1520433919398,3
1,STOP,1520433924398,4
1,START,1520433929398,5
1,TRACKING,1520433934399,6
1,TRACKING,1520433939399,7
1,TRACKING,1520433944399,8
1,STOP,1520433949399,9
Where trigger_id can be an indicator such as: start,tracking,stop. What I
would like to do is based on device_id group all incoming events and define
a window based on the trigger_id. I.e group all events from start until stop
and then do some calculations such as: average,max etc.
I call env.readTextFile("events.csv"); and set StreamTimeCharacteristic to
EventTime. Parellism is set to 4. This means my messages from the source
file are read but are not in order... (I have added messageId as an counter
only for dev purposes)
I have defined a custom trigger to find stop events and fire in order to
evict all collected events. My main problem is that if parellism is
increased from 1 the input source reads these out of order.
Shouldn't event time and watermarks resolve this issue? How do i handle
possible out of order events?
public class SimpleEventJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(4);
//env.setParallelism(1);
DataStream<String> input = env.readTextFile("events.csv");
// create event stream
DataStream<Event> events = input.map(new LineToEvent());
DataStream<Event> waterMarkedStreams =
events.assignTimestampsAndWatermarks(new EventWAssigner());
DataStream<TripEv> tripStream = waterMarkedStreams.keyBy("deviceId")
.window(GlobalWindows.create())
.trigger(new TripTrigger())
.evictor(new TripEvictor())
.apply(new CreateTrip());
tripStream.print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static class TripTrigger extends Trigger<Event, GlobalWindow> {
@Override
public TriggerResult onElement(Event event, long timestamp,
GlobalWindow window, TriggerContext context) throws Exception {
// if this is a stop event, set a timer
if (event.getTrigger() == 53) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window,
TriggerContext ctx) {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window,
TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) {
}
}
private static class TripEvictor implements Evictor<Event, GlobalWindow>
{
@Override
public void evictBefore(Iterable<TimestampedValue<Event>> events,
int size, GlobalWindow window, EvictorContext ctx) {
}
@Override
public void evictAfter(Iterable<TimestampedValue<Event>> elements, int
size, GlobalWindow window, EvictorContext
ctx) {
System.out.println(elements);
long firstStop = Event.earliestStopElement(elements);
// remove all events up to (and including) the first stop event (which is
the event that triggered the window)
for (Iterator<TimestampedValue<Event>> iterator = elements.iterator();
iterator.hasNext(); ) {
TimestampedValue<Event> element = iterator.next();
if (element.getTimestamp() >= firstStop ) {
iterator.remove();
}
}
}
}
public static class CreateTrip implements WindowFunction<Event, TripEv,
Tuple, GlobalWindow> {
@Override
public void apply(Tuple key, GlobalWindow window, Iterable<Event> events,
Collector<TripEv> out) {
TripEv trp = new TripEv(events);
if (trp.length > 0) {
out.collect(trp);
}
}
}
private static class LineToEvent implements MapFunction<String, Event> {
@Override
public Event map(String line) throws Exception {
return Event.fromString(line);
}
}
private static class EventWAssigner implements
AssignerWithPunctuatedWatermarks<Event> {
@Override
public long extractTimestamp(Event event, long previousElementTimestamp) {
return event.getTimestamp();
}
@Override
public Watermark checkAndGetNextWatermark(Event event, long
extractedTimestamp) {
// simply emit a watermark with every event
return new Watermark(extractedTimestamp - 1000L);
}
}
}
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Global Window, Trigger and Watermarks, Parallelism
Posted by dim5b <dm...@gmail.com>.
By adding , AfterMatchSkipStrategy.skipPastLastEvent() it returns what i
want.
Is there a way to track/emit "ongoing" events i.e before the pattern matchs
the end event type?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Global Window, Trigger and Watermarks, Parallelism
Posted by dim5b <dm...@gmail.com>.
I see you replied on
https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching
with a known bug issue on the
https://issues.apache.org/jira/browse/FLINK-8914
In my case my pattern looks like
Pattern<Event, Event> tripPattern =
Pattern.<Event>begin("start").times(1).where(START_CONDITION)
.followedBy("middle").where(MIDDLE_CONDITION).oneOrMore()
.next("end").where(END_CONDITION);
the end result being..
1> 1,2,3,4,
4> 1,2,3,6,7,8,11,12,
1> 5,6,7,8,11,12,
3> 5,6,7,8,9,
2> 1,2,3,6,7,8,9,
2> 10,11,12,
By using next operator I limit the end-terminating side but begin takes all
possible solutions.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Global Window, Trigger and Watermarks, Parallelism
Posted by dim5b <dm...@gmail.com>.
I have looked into the CEP library. I have posted an issued on
stackoverflow.
https://stackoverflow.com/questions/49047879/global-windows-in-flink-using-custom-triggers-vs-flink-cep-pattern-api
However the pattern matches all possible solution on the stream of
events.Does pattern have a notion of Evictor? How can you keep only the
specific set of events.
This question has not been answered and there seems to be a similar question
https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching
Thanks
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Global Window, Trigger and Watermarks, Parallelism
Posted by Chesnay Schepler <ch...@apache.org>.
Hello,
Event-time and watermarks can be used to deal with out-of-order events,
but since you're using global windows (opposed to time-based windows)
you have to implement the logic for doing this yourself.
Conceptually, what you would have to do is to not create your TripEv
when receiving a STOP event, but when a given amount of time has passed
after you received it. Your CreateTrip function would have to scan the
window contents for START -> STOP sequences and check the timestamp of
STOP with the current event time. (I think for this you would have to
extend ProcessWindowFunction instead)
The evictor would have to use the same logic to detect sequences that
were already processed.
I suggest to look into our CEP
<https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html>
library as this looks like a perfect use-case for it.
On 12.03.2018 14:37, dim5b wrote:
> Could someone clarify how exactly event time/watermarks and allow lateness
> work. I have created the program below and I have an input file such as...
>
> device_id,trigger_id,event_time,messageId
> 1,START,1520433909396,1
> 1,TRACKING,1520433914398,2
> 1,TRACKING,1520433919398,3
> 1,STOP,1520433924398,4
> 1,START,1520433929398,5
> 1,TRACKING,1520433934399,6
> 1,TRACKING,1520433939399,7
> 1,TRACKING,1520433944399,8
> 1,STOP,1520433949399,9
>
> Where trigger_id can be an indicator such as: start,tracking,stop. What I
> would like to do is based on device_id group all incoming events and define
> a window based on the trigger_id. I.e group all events from start until stop
> and then do some calculations such as: average,max etc.
>
> I call env.readTextFile("events.csv"); and set StreamTimeCharacteristic to
> EventTime. Parellism is set to 4. This means my messages from the source
> file are read but are not in order... (I have added messageId as an counter
> only for dev purposes)
>
> I have defined a custom trigger to find stop events and fire in order to
> evict all collected events. My main problem is that if parellism is
> increased from 1 the input source reads these out of order.
>
> Shouldn't event time and watermarks resolve this issue? How do i handle
> possible out of order events?
>
> public class SimpleEventJob {
>
> public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(4);
> //env.setParallelism(1);
> DataStream<String> input = env.readTextFile("events.csv");
> // create event stream
> DataStream<Event> events = input.map(new LineToEvent());
> DataStream<Event> waterMarkedStreams =
> events.assignTimestampsAndWatermarks(new EventWAssigner());
> DataStream<TripEv> tripStream = waterMarkedStreams.keyBy("deviceId")
> .window(GlobalWindows.create())
> .trigger(new TripTrigger())
> .evictor(new TripEvictor())
> .apply(new CreateTrip());
> tripStream.print();
> // execute program
> env.execute("Flink Streaming Java API Skeleton");
> }
>
> public static class TripTrigger extends Trigger<Event, GlobalWindow> {
> @Override
> public TriggerResult onElement(Event event, long timestamp,
> GlobalWindow window, TriggerContext context) throws Exception {
> // if this is a stop event, set a timer
> if (event.getTrigger() == 53) {
> return TriggerResult.FIRE;
> }
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onEventTime(long time, GlobalWindow window,
> TriggerContext ctx) {
> return TriggerResult.FIRE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, GlobalWindow window,
> TriggerContext ctx) {
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public void clear(GlobalWindow window, TriggerContext ctx) {
> }
> }
>
> private static class TripEvictor implements Evictor<Event, GlobalWindow>
> {
> @Override
> public void evictBefore(Iterable<TimestampedValue<Event>> events,
> int size, GlobalWindow window, EvictorContext ctx) {
> }
>
> @Override
> public void evictAfter(Iterable<TimestampedValue<Event>> elements, int
> size, GlobalWindow window, EvictorContext
> ctx) {
> System.out.println(elements);
> long firstStop = Event.earliestStopElement(elements);
> // remove all events up to (and including) the first stop event (which is
> the event that triggered the window)
> for (Iterator<TimestampedValue<Event>> iterator = elements.iterator();
> iterator.hasNext(); ) {
> TimestampedValue<Event> element = iterator.next();
> if (element.getTimestamp() >= firstStop ) {
> iterator.remove();
> }
> }
> }
> }
>
> public static class CreateTrip implements WindowFunction<Event, TripEv,
> Tuple, GlobalWindow> {
> @Override
> public void apply(Tuple key, GlobalWindow window, Iterable<Event> events,
> Collector<TripEv> out) {
> TripEv trp = new TripEv(events);
> if (trp.length > 0) {
> out.collect(trp);
> }
> }
> }
>
> private static class LineToEvent implements MapFunction<String, Event> {
> @Override
> public Event map(String line) throws Exception {
> return Event.fromString(line);
> }
> }
>
>
> private static class EventWAssigner implements
> AssignerWithPunctuatedWatermarks<Event> {
> @Override
> public long extractTimestamp(Event event, long previousElementTimestamp) {
> return event.getTimestamp();
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(Event event, long
> extractedTimestamp) {
> // simply emit a watermark with every event
> return new Watermark(extractedTimestamp - 1000L);
> }
> }
> }
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>