You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kostas Kloudas (JIRA)" <ji...@apache.org> on 2017/05/11 14:54:04 UTC

[jira] [Commented] (FLINK-6297) CEP timeout does not trigger under certain conditions

    [ https://issues.apache.org/jira/browse/FLINK-6297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006566#comment-16006566 ] 

Kostas Kloudas commented on FLINK-6297:
---------------------------------------

Hi [~vijayakumarpl]. Thanks for reporting this. 

I tried to reproduce the bug on both Flink 1.2 and 1.3 and I cannot make it.

Could you post here the code that generated this bug? This will help me pin down the source of the problem (if there is one).
This is the code that I run on 1.2

{{{
public static void main(String[] args) throws Exception {

		List<MyEvent> inputElements = new ArrayList<>();
		inputElements.add(new MyEvent(1, 'a', 1, 1));
		inputElements.add(new MyEvent(1, 'b', 1, 2));
		inputElements.add(new MyEvent(1, 'a', 2, 2));
		inputElements.add(new MyEvent(1, 'a', 3, 5));

		Pattern<MyEvent, ?> pattern = Pattern.<MyEvent>begin("a").where(new FilterFunction<MyEvent>() {
			private static final long serialVersionUID = 7219646616484327688L;

			@Override
			public boolean filter(MyEvent myEvent) throws Exception {
				return myEvent.getPayload() == 'a';
			}
		}).next("b").where(new FilterFunction<MyEvent>() {
			private static final long serialVersionUID = 7219646616484327688L;

			@Override
			public boolean filter(MyEvent myEvent) throws Exception {
				return myEvent.getPayload() == 'b';
			}
		}).within(Time.milliseconds(2L));

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.getConfig().setAutoWatermarkInterval(20000);

		DataStream<MyEvent> input = env.fromCollection(inputElements).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
			private static final long serialVersionUID = -6619787346214245526L;

			@Override
			public long extractAscendingTimestamp(MyEvent myEvent) {
				return myEvent.getTimestamp();
			}
		});

		PatternStream<MyEvent> patternStream = CEP.pattern(input.keyBy(new KeySelector<MyEvent, Long>() {
			private static final long serialVersionUID = 6928745840509494198L;

			@Override
			public Long getKey(MyEvent myEvent) throws Exception {
				return myEvent.getId();
			}
		}), pattern);


		patternStream.select(new PatternTimeoutFunction<MyEvent, String>() {
			@Override
			public String timeout(Map<String, MyEvent> map, long l) throws Exception {
				return map.toString() +" @ "+ l;
			}

			private static final long serialVersionUID = 300759199619789416L;


		}, new PatternSelectFunction<MyEvent, String>() {

			@Override
			public String select(Map<String, MyEvent> map) throws Exception {
				return map.toString();
			}

			private static final long serialVersionUID = 732172159423132724L;
		}).print();

		env.execute("Bug Reproducing Job");
	}
}}}


> CEP timeout does not trigger under certain conditions
> -----------------------------------------------------
>
>                 Key: FLINK-6297
>                 URL: https://issues.apache.org/jira/browse/FLINK-6297
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.2.0
>            Reporter: Vijayakumar Palaniappan
>
> -TimeoutPattern does not trigger under certain conditions. Following are the preconditions: 
> -Assume a pattern of Event A followed by Event B within 2 Seconds
> -PeriodicWaterMarks every 1 second
> -Assume following events have arrived. 
> -Event A-1[time: 1 sec]
> -Event B-1[time: 2 sec] 
> -Event A-2[time: 2 sec]
> -Event A-3[time: 5 sec] 
> -WaterMark[time: 5 sec]
> I would assume that after watermark arrival, Event A-1,B-1 detected. A-2 timed out. But A-2 timeout does not happen.
> if i use a punctuated watermark and generate watermark for every event, it seems to work as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)