You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (JIRA)" <ji...@apache.org> on 2018/06/07 15:43:00 UTC
[jira] [Commented] (FLINK-9547) CEP pattern not called on windowed
stream
[ https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504834#comment-16504834 ]
Dawid Wysakowicz commented on FLINK-9547:
-----------------------------------------
Hi [~MLNotW]. Could you provide a runnable example that we could use to reproduce this bug? With inputs and ProcessFunctions?
> CEP pattern not called on windowed stream
> -----------------------------------------
>
> Key: FLINK-9547
> URL: https://issues.apache.org/jira/browse/FLINK-9547
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.3.2, 1.5.0
> Reporter: Lucas Resch
> Priority: Major
>
> When trying to match a pattern on a stream that was windowed the pattern will not be called. The following shows example code where the issue was noticed:
> {code:java}
> // Set up stream
> SingleOutputStreamOperator<ForceZ> forces = ...
> .filter(new FilterForcesFunction())
> .process(new ProcessForcesFunction());
> // Define mock pattern
> Pattern<ForceZ, ?> forcesMock = Pattern.<ForceZ>begin("start").where(new SimpleCondition<ForceZ>() {
> @Override
> public boolean filter(ForceZ value) {
> // This is called as expected
> return true;
> }
> });
> // Print pattern results
> // This actually prints all incoming events as expected
> CEP.pattern(forcesMock, mock)
> .select(new PatternSelectFunction<ForceZ, ForceZ>() {
> @Override
> public ForceZ select(Map<String, List<ForceZ>> pattern){
> return pattern.get("start").get(0);
> }
> }).print();
> // Create another stream based on a sliding window over the input stream
> SingleOutputStreamOperator<Interval> intervals = forces
> .countWindowAll(2, 1)
> .process(new ForceWindowFunction());
> // Define mock pattern
> Pattern<Interval, Interval> intervalMock = Pattern.<Interval>begin("start").where(new SimpleCondition<Interval>() {
> @Override
> public boolean filter(Interval value) throws Exception {
> // This is never called
> return true;
> }
> });
> // Print pattern results
> // Doesn't print anything since the mock condition is never called
> CEP.pattern(intervals, intervalMock)
> .select(new PatternSelectFunction<Interval, Interval>() {
> @Override
> public Interval select(Map<String, List<Interval>> pattern) throws Exception {
> return pattern.get("start").get(0);
> }
> }).print();
> {code}
> Either I'm doing something wrong or this is a major bug.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)