You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhanghao (JIRA)" <ji...@apache.org> on 2018/10/17 02:49:00 UTC

[jira] [Created] (FLINK-10577) CEP's greedy() doesn't work

zhanghao created FLINK-10577:
--------------------------------

             Summary: CEP's greedy() doesn't work
                 Key: FLINK-10577
                 URL: https://issues.apache.org/jira/browse/FLINK-10577
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.6.0
            Reporter: zhanghao


I think greedy operator has some problem.

Given the below java code:

 
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "r"),
Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:01").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:03").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:04").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:05").getTime(), "c"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:08").getTime(), "c"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:11").getTime(), "a")
).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, String>>(Time.seconds(2)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Tuple3<Integer, Long, String> element) {
return element.f1; 
}
});
AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("start", strategy)
.where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple3<Integer, Long, String> e) {
return e.f2.equals("r") ? true : false;
}
}).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple3<Integer, Long, String> e) throws Exception {
return !e.f2.equals("r") ? true : false;
}
}).oneOrMore().greedy()
.within(Time.seconds(10));
CEP.pattern(input.keyBy(0), pattern)
.select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String select(Map<String, List<Tuple3<Integer, Long, String>>> pattern) {
StringBuilder builder = new StringBuilder();
List<Tuple3<Integer, Long, String>> start = pattern.get("start");
List<Tuple3<Integer, Long, String>> middle = pattern.get("middle");
for (Tuple3<Integer, Long, String> t : start) {
builder.append(t.f0).append(",");
}
for (Tuple3<Integer, Long, String> t : middle) {
builder.append(t.f0).append(",");
}
return builder.toString(); 
}
})
.print(); 
env.execute();{code}
I would like to see:100,100,100,100,100,100

however it matches 100,100

I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping some partial matches,it still failed,it also matches 100,100.

Is there something important about greedy operator that i misunderstood?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)