You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Lucas Resch (JIRA)" <ji...@apache.org> on 2018/06/07 18:07:00 UTC

[jira] [Comment Edited] (FLINK-9547) CEP pattern not called on windowed stream

    [ https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505021#comment-16505021 ] 

Lucas Resch edited comment on FLINK-9547 at 6/7/18 6:06 PM:
------------------------------------------------------------

[~dawidwys] I created a small example that does something similar. Somehow the behavior is different though. Now it doesn't call the initial pattern but the one on the windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Integer> objectDataStreamSource = env.fromElements(
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator<Long> forces = objectDataStreamSource
        .filter((FilterFunction<Integer>) Objects::nonNull)
        .process(new ProcessFunction<Integer, Long>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Long> out) throws Exception {
                out.collect(value.longValue());
            }
        });

Pattern<Long, Long> forcesMock = Pattern.<Long>begin("start").where(new SimpleCondition<Long>() {
    @Override
    public boolean filter(Long value) {
        return true;
    }
});

CEP.pattern(forces, forcesMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws Exception {
                return String.format("Prints %d as expected", pattern.get("start").get(0));
            }
        }).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator<Long> intervals = forces
        .countWindowAll(2, 1)
        .process(new ProcessAllWindowFunction<Long, Long, GlobalWindow>() {
            @Override
            public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
                List<Long> items = new ArrayList<>();
                elements.forEach(items::add);
                if (items.size() == 2) {
                    out.collect(items.get(0));
                }
            }
        });

Pattern<Long, Long> intervalMock = Pattern.<Long>begin("start").where(new SimpleCondition<Long>() {
    @Override
    public boolean filter(Long value) throws Exception {
        return true;
    }
});

CEP.pattern(intervals, intervalMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws Exception {
                return String.format("Doesn't print %d", pattern.get("start").get(0));
            }
        }).print();

env.execute();
{code}


was (Author: mlnotw):
I created a small example that does something similar. Somehow the behavior is different though. Now it doesn't call the initial pattern but the one on the windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Integer> objectDataStreamSource = env.fromElements(
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator<Long> forces = objectDataStreamSource
        .filter((FilterFunction<Integer>) Objects::nonNull)
        .process(new ProcessFunction<Integer, Long>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Long> out) throws Exception {
                out.collect(value.longValue());
            }
        });

Pattern<Long, Long> forcesMock = Pattern.<Long>begin("start").where(new SimpleCondition<Long>() {
    @Override
    public boolean filter(Long value) {
        return true;
    }
});

CEP.pattern(forces, forcesMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws Exception {
                return String.format("Prints %d as expected", pattern.get("start").get(0));
            }
        }).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator<Long> intervals = forces
        .countWindowAll(2, 1)
        .process(new ProcessAllWindowFunction<Long, Long, GlobalWindow>() {
            @Override
            public void process(Context context, Iterable<Long> elements, Collector<Long> out) throws Exception {
                List<Long> items = new ArrayList<>();
                elements.forEach(items::add);
                if (items.size() == 2) {
                    out.collect(items.get(0));
                }
            }
        });

Pattern<Long, Long> intervalMock = Pattern.<Long>begin("start").where(new SimpleCondition<Long>() {
    @Override
    public boolean filter(Long value) throws Exception {
        return true;
    }
});

CEP.pattern(intervals, intervalMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws Exception {
                return String.format("Doesn't print %d", pattern.get("start").get(0));
            }
        }).print();

env.execute();
{code}

> CEP pattern not called on windowed stream
> -----------------------------------------
>
>                 Key: FLINK-9547
>                 URL: https://issues.apache.org/jira/browse/FLINK-9547
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.2, 1.5.0
>            Reporter: Lucas Resch
>            Priority: Major
>
> When trying to match a pattern on a stream that was windowed the pattern will not be called. The following shows example code where the issue was noticed:
> {code:java}
> // Set up stream
> SingleOutputStreamOperator<ForceZ> forces = ...
>         .filter(new FilterForcesFunction())
>         .process(new ProcessForcesFunction());
> // Define mock pattern
> Pattern<ForceZ, ?> forcesMock = Pattern.<ForceZ>begin("start").where(new SimpleCondition<ForceZ>() {
>     @Override
>     public boolean filter(ForceZ value) {
>         // This is called as expected
>         return true;
>     }
> });
> // Print pattern results
> // This actually prints all incoming events as expected
> CEP.pattern(forcesMock, mock)
>         .select(new PatternSelectFunction<ForceZ, ForceZ>() {
>             @Override
>             public ForceZ select(Map<String, List<ForceZ>> pattern){
>                 return pattern.get("start").get(0);
>             }
>         }).print();
> // Create another stream based on a sliding window over the input stream
> SingleOutputStreamOperator<Interval> intervals = forces
>         .countWindowAll(2, 1)
>         .process(new ForceWindowFunction());
> // Define mock pattern
> Pattern<Interval, Interval> intervalMock = Pattern.<Interval>begin("start").where(new SimpleCondition<Interval>() {
>     @Override
>     public boolean filter(Interval value) throws Exception {
>         // This is never called
>         return true;
>     }
> });
> // Print pattern results
> // Doesn't print anything since the mock condition is never called
> CEP.pattern(intervals, intervalMock)
>         .select(new PatternSelectFunction<Interval, Interval>() {
>             @Override
>             public Interval select(Map<String, List<Interval>> pattern) throws Exception {
>                 return pattern.get("start").get(0);
>             }
>         }).print();
> {code}
> Either I'm doing something wrong or this is a major bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)