You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (JIRA)" <ji...@apache.org> on 2018/10/18 03:09:00 UTC

[jira] [Commented] (FLINK-10577) CEP's greedy() doesn't work

    [ https://issues.apache.org/jira/browse/FLINK-10577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654571#comment-16654571 ] 

Dian Fu commented on FLINK-10577:
---------------------------------

Hi [~simahoa], this is currently by design. For pattern (start middle+), when an event matching pattern `middle` comes, it will output `start middle` as we do not know if there will be any more events matching the pattern `middle` in stream and when another event matching `middle` comes, it will output `start middle middle` which can be seen an update of the previous sent result `start middle`.

> CEP's greedy() doesn't work
> ---------------------------
>
>                 Key: FLINK-10577
>                 URL: https://issues.apache.org/jira/browse/FLINK-10577
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.6.0
>            Reporter: zhanghao
>            Priority: Major
>
> I think greedy operator has some problem.
> Given the below java code:
>  
> {code:java}
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "r"),
> Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:01").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:03").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:04").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:05").getTime(), "c"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:08").getTime(), "c"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:11").getTime(), "a")
> ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, String>>(Time.seconds(2)) {
> private static final long serialVersionUID = 1L;
> @Override
> public long extractTimestamp(Tuple3<Integer, Long, String> element) {
> return element.f1; 
> }
> });
> AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent();
> Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("start", strategy)
> .where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
> private static final long serialVersionUID = 1L;
> @Override
> public boolean filter(Tuple3<Integer, Long, String> e) {
> return e.f2.equals("r") ? true : false;
> }
> }).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
> private static final long serialVersionUID = 1L;
> @Override
> public boolean filter(Tuple3<Integer, Long, String> e) throws Exception {
> return !e.f2.equals("r") ? true : false;
> }
> }).oneOrMore().greedy()
> .within(Time.seconds(10));
> CEP.pattern(input.keyBy(0), pattern)
> .select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() {
> private static final long serialVersionUID = 1L;
> @Override
> public String select(Map<String, List<Tuple3<Integer, Long, String>>> pattern) {
> StringBuilder builder = new StringBuilder();
> List<Tuple3<Integer, Long, String>> start = pattern.get("start");
> List<Tuple3<Integer, Long, String>> middle = pattern.get("middle");
> for (Tuple3<Integer, Long, String> t : start) {
> builder.append(t.f0).append(",");
> }
> for (Tuple3<Integer, Long, String> t : middle) {
> builder.append(t.f0).append(",");
> }
> return builder.toString(); 
> }
> })
> .print(); 
> env.execute();{code}
> I would like to see:100,100,100,100,100,100
> however it matches 100,100
> I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping some partial matches,it also matches 100,100.
> Is there something important about greedy operator that i misunderstand?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)