You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kostas Kloudas (JIRA)" <ji...@apache.org> on 2017/10/05 12:52:00 UTC

[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

    [ https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16192823#comment-16192823 ] 

Kostas Kloudas commented on FLINK-7549:
---------------------------------------

Hi [~info@paolorendano.it],

Is this issue still valid, or it is resolved by setting the [[timeCharacteristic]] to event time?

> CEP - Pattern not discovered if source streaming is very fast
> -------------------------------------------------------------
>
>                 Key: FLINK-7549
>                 URL: https://issues.apache.org/jira/browse/FLINK-7549
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.1, 1.3.2
>            Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data on a rabbitmq queue. This queue contains status generated by different devices . In my test case I set to loop on a base of 1000 cycles, each one sending respectively the first and the second status that generate the event using flink CEP (status keyed by device). I expect to get an output of 1000 events.
> In my early tests I launched that but I noticed that I get only partial results in output (70/80% of the expected ones). Introducing a delay in jmeter plan between the sending of the two status solved the problem. The minimum delay (of course this is on my local machine, on other machines may vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream<MyMessageWrapper> dataStreamSource =
>                     env.addSource(new MYRMQAutoboundQueueSource<>(connectionConfig,
>                             conf.getSourceExchange(),
>                             conf.getSourceRoutingKey(),
>                             conf.getSourceQueueName(),
>                             true,
>                             new MyMessageWrapperSchema()))
>                             .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
>                                 private static final long serialVersionUID = -1L;
>                                 @Override
>                                 public long extractTimestamp(MyMessageWrapper element) {
>                                     if (element.getData().get("stateTimestamp")==null) {
>                                         throw new RuntimeException("Status Timestamp is null during time ordering for device [" +  element.getData().get("deviceCode") + "]");
>                                     }
>                                     return FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
>                                 }
>                             })
>                             .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern<MyMessageWrapper, ?> myPattern = Pattern
>                 .<MyMessageWrapper>begin("start")
>                 	.subtype(MyMessageWrapper.class)
>                 	.where(whereEquals("st", "none"))
>                 .next("end")
>                 	.subtype(MyMessageWrapper.class)
>                 	.where(whereEquals("st","started"))
>                 .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream<MyMessageWrapper> myPatternStream = CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream = myPatternStream.flatSelect(patternFlatTimeoutFunction, patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what happens is that with that so high rate of messages, source may receive messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)