You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Lucas Resch (JIRA)" <ji...@apache.org> on 2018/06/07 15:31:00 UTC
[jira] [Created] (FLINK-9547) CEP pattern not called on windowed
stream
Lucas Resch created FLINK-9547:
----------------------------------
Summary: CEP pattern not called on windowed stream
Key: FLINK-9547
URL: https://issues.apache.org/jira/browse/FLINK-9547
Project: Flink
Issue Type: Bug
Components: CEP
Affects Versions: 1.5.0, 1.3.2
Reporter: Lucas Resch
When trying to match a pattern on a stream that was windowed the pattern will not be called. The following shows example code where the issue was noticed:
{code:java}
// Set up stream
SingleOutputStreamOperator<ForceZ> forces = ...
.filter(new FilterForcesFunction())
.process(new ProcessForcesFunction());
// Define mock pattern
Pattern<ForceZ, ?> forcesMock = Pattern.<ForceZ>begin("start").where(new SimpleCondition<ForceZ>() {
@Override
public boolean filter(ForceZ value) {
// This is called as expected
return true;
}
});
// Print pattern results
// This actually prints all incoming events as expected
CEP.pattern(forcesMock, mock)
.select(new PatternSelectFunction<ForceZ, ForceZ>() {
@Override
public ForceZ select(Map<String, List<ForceZ>> pattern){
return pattern.get("start").get(0);
}
}).print();
// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator<Interval> intervals = forces
.countWindowAll(2, 1)
.process(new ForceWindowFunction());
// Define mock pattern
Pattern<Interval, Interval> intervalMock = Pattern.<Interval>begin("start").where(new SimpleCondition<Interval>() {
@Override
public boolean filter(Interval value) throws Exception {
// This is never called
return true;
}
});
// Print pattern results
// Doesn't print anything since the mock condition is never called
CEP.pattern(intervals, intervalMock)
.select(new PatternSelectFunction<Interval, Interval>() {
@Override
public Interval select(Map<String, List<Interval>> pattern) throws Exception {
return pattern.get("start").get(0);
}
}).print();
{code}
Either I'm doing something wrong or this is a major bug.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)