You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Paolo Rendano (JIRA)" <ji...@apache.org> on 2017/08/29 10:04:00 UTC

[jira] [Created] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

Paolo Rendano created FLINK-7549:
------------------------------------

             Summary: CEP - Pattern not discovered if source streaming is very fast
                 Key: FLINK-7549
                 URL: https://issues.apache.org/jira/browse/FLINK-7549
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.3.2, 1.3.1
            Reporter: Paolo Rendano


Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data on a rabbitmq queue. This queue contains status generated by different devices . In my test case I set to loop on a base of 1000 cycles, each one sending respectively the first and the second status that generate the event using flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results in output (70/80% of the expected ones). Introducing a delay in jmeter plan between the sending of the two status solved the problem. The minimum delay (of course this is on my local machine, on other machines may vary) that make things work is 20/25 ms.

My code is structured this way (the following is a semplification):

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream<MyMessageWrapper> dataStreamSource =
                    env.addSource(new MYRMQAutoboundQueueSource<>(connectionConfig,
                            conf.getSourceExchange(),
                            conf.getSourceRoutingKey(),
                            conf.getSourceQueueName(),
                            true,
                            new MyMessageWrapperSchema()))
                            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
                                private static final long serialVersionUID = -1L;
                                @Override
                                public long extractTimestamp(MyMessageWrapper element) {
                                    if (element.getData().get("stateTimestamp")==null) {
                                        throw new RuntimeException("Status Timestamp is null during time ordering for device [" +  element.getData().get("deviceCode") + "]");
                                    }
                                    return FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
                                }
                            })
                            .name("MyIncomingStatus");

// PATTERN  DEFINITION
Pattern<MyMessageWrapper, ?> myPattern = Pattern
                .<MyMessageWrapper>begin("start")
                	.subtype(MyMessageWrapper.class)
                	.where(whereEquals("st", "none"))
                .next("end")
                	.subtype(MyMessageWrapper.class)
                	.where(whereEquals("st","started"))
                .within(Time.minutes(3));

// CEP DEFINITION
PatternStream< MyMessageWrapper > myPatternStream = CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);

DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream = myPatternStream.flatSelect(patternFlatTimeoutFunction, patternFlatSelectFunction);

// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");

digging and logging messages received by flink in "extractTimestamp", what happens is that with that so high rate of messages, source may receive messages with the same timestamp but with different deviceCode. 
Any idea?

Thanks, regards
Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)