You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhanghao (JIRA)" <ji...@apache.org> on 2018/10/17 02:51:00 UTC

[jira] [Updated] (FLINK-10577) CEP's greedy() doesn't work

     [ https://issues.apache.org/jira/browse/FLINK-10577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhanghao updated FLINK-10577:
-----------------------------
    Description: 
I think greedy operator has some problem.

Given the below java code:

 
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "r"),
Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:01").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:03").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:04").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:05").getTime(), "c"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:08").getTime(), "c"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:11").getTime(), "a")
).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, String>>(Time.seconds(2)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Tuple3<Integer, Long, String> element) {
return element.f1; 
}
});
AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("start", strategy)
.where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple3<Integer, Long, String> e) {
return e.f2.equals("r") ? true : false;
}
}).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple3<Integer, Long, String> e) throws Exception {
return !e.f2.equals("r") ? true : false;
}
}).oneOrMore().greedy()
.within(Time.seconds(10));
CEP.pattern(input.keyBy(0), pattern)
.select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String select(Map<String, List<Tuple3<Integer, Long, String>>> pattern) {
StringBuilder builder = new StringBuilder();
List<Tuple3<Integer, Long, String>> start = pattern.get("start");
List<Tuple3<Integer, Long, String>> middle = pattern.get("middle");
for (Tuple3<Integer, Long, String> t : start) {
builder.append(t.f0).append(",");
}
for (Tuple3<Integer, Long, String> t : middle) {
builder.append(t.f0).append(",");
}
return builder.toString(); 
}
})
.print(); 
env.execute();{code}
I would like to see:100,100,100,100,100,100

however it matches 100,100

I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping some partial matches,it also matches 100,100.

Is there something important about greedy operator that i misunderstood?

  was:
I think greedy operator has some problem.

Given the below java code:

 
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "r"),
Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:01").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:03").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:04").getTime(), "p"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:05").getTime(), "c"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:08").getTime(), "c"),
Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:11").getTime(), "a")
).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, String>>(Time.seconds(2)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Tuple3<Integer, Long, String> element) {
return element.f1; 
}
});
AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("start", strategy)
.where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple3<Integer, Long, String> e) {
return e.f2.equals("r") ? true : false;
}
}).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Tuple3<Integer, Long, String> e) throws Exception {
return !e.f2.equals("r") ? true : false;
}
}).oneOrMore().greedy()
.within(Time.seconds(10));
CEP.pattern(input.keyBy(0), pattern)
.select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String select(Map<String, List<Tuple3<Integer, Long, String>>> pattern) {
StringBuilder builder = new StringBuilder();
List<Tuple3<Integer, Long, String>> start = pattern.get("start");
List<Tuple3<Integer, Long, String>> middle = pattern.get("middle");
for (Tuple3<Integer, Long, String> t : start) {
builder.append(t.f0).append(",");
}
for (Tuple3<Integer, Long, String> t : middle) {
builder.append(t.f0).append(",");
}
return builder.toString(); 
}
})
.print(); 
env.execute();{code}
I would like to see:100,100,100,100,100,100

however it matches 100,100

I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping some partial matches,it still failed,it also matches 100,100.

Is there something important about greedy operator that i misunderstood?


> CEP's greedy() doesn't work
> ---------------------------
>
>                 Key: FLINK-10577
>                 URL: https://issues.apache.org/jira/browse/FLINK-10577
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.6.0
>            Reporter: zhanghao
>            Priority: Major
>
> I think greedy operator has some problem.
> Given the below java code:
>  
> {code:java}
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "r"),
> Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:01").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:03").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:04").getTime(), "p"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:05").getTime(), "c"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:08").getTime(), "c"),
> Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:11").getTime(), "a")
> ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, String>>(Time.seconds(2)) {
> private static final long serialVersionUID = 1L;
> @Override
> public long extractTimestamp(Tuple3<Integer, Long, String> element) {
> return element.f1; 
> }
> });
> AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent();
> Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("start", strategy)
> .where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
> private static final long serialVersionUID = 1L;
> @Override
> public boolean filter(Tuple3<Integer, Long, String> e) {
> return e.f2.equals("r") ? true : false;
> }
> }).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
> private static final long serialVersionUID = 1L;
> @Override
> public boolean filter(Tuple3<Integer, Long, String> e) throws Exception {
> return !e.f2.equals("r") ? true : false;
> }
> }).oneOrMore().greedy()
> .within(Time.seconds(10));
> CEP.pattern(input.keyBy(0), pattern)
> .select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() {
> private static final long serialVersionUID = 1L;
> @Override
> public String select(Map<String, List<Tuple3<Integer, Long, String>>> pattern) {
> StringBuilder builder = new StringBuilder();
> List<Tuple3<Integer, Long, String>> start = pattern.get("start");
> List<Tuple3<Integer, Long, String>> middle = pattern.get("middle");
> for (Tuple3<Integer, Long, String> t : start) {
> builder.append(t.f0).append(",");
> }
> for (Tuple3<Integer, Long, String> t : middle) {
> builder.append(t.f0).append(",");
> }
> return builder.toString(); 
> }
> })
> .print(); 
> env.execute();{code}
> I would like to see:100,100,100,100,100,100
> however it matches 100,100
> I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping some partial matches,it also matches 100,100.
> Is there something important about greedy operator that i misunderstood?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)