You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "aitozi (JIRA)" <ji...@apache.org> on 2018/09/03 09:36:00 UTC

[jira] [Commented] (FLINK-8914) CEP's greedy() modifier doesn't work

    [ https://issues.apache.org/jira/browse/FLINK-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16601943#comment-16601943 ] 

aitozi commented on FLINK-8914:
-------------------------------

Hi,[~alpinegizmo] & [~GuoLJ]

Now It can be solved like this 

{code:java}
Pattern<Integer, ?> onesThenZero = Pattern.<Integer>begin("ones", AfterMatchSkipStrategy.skipPastLastEvent())
	.where(new SimpleCondition<Integer>() {
		@Override
		public boolean filter(Integer value) throws Exception {
			return value == 1;
		}
	})
	.oneOrMore()
	.greedy()
	.consecutive()
	.next("zero")
	.where(new SimpleCondition<Integer>() {
		@Override
		public boolean filter(Integer value) throws Exception {
			return value == 0;
		}
	});
{code}

By specify the AfterMatchSkipStrategy to skip middle result. You can try it with flink-cep version 1.6.0

> CEP's greedy() modifier doesn't work
> ------------------------------------
>
>                 Key: FLINK-8914
>                 URL: https://issues.apache.org/jira/browse/FLINK-8914
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.4.0, 1.4.1
>            Reporter: David Anderson
>            Assignee: aitozi
>            Priority: Major
>
> When applied to the first or last component of a CEP Pattern, greedy() doesn't work correctly. Here's an example:
> {code:java}
> package com.dataartisans.flinktraining.exercises.datastream_java.cep;
> import org.apache.flink.cep.CEP;
> import org.apache.flink.cep.PatternSelectFunction;
> import org.apache.flink.cep.PatternStream;
> import org.apache.flink.cep.pattern.Pattern;
> import org.apache.flink.cep.pattern.conditions.SimpleCondition;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.util.List;
> import java.util.Map;
> public class RunLength {
>   public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>     DataStream<Integer> input = env.fromElements(1, 1, 1, 1, 1, 0, 1, 1, 1, 0);
>     Pattern<Integer, ?> onesThenZero = Pattern.<Integer>begin("ones")
>       .where(new SimpleCondition<Integer>() {
>         @Override
>         public boolean filter(Integer value) throws Exception {
>           return value == 1;
>         }
>       })
>       .oneOrMore()
>       .greedy()
>       .consecutive()
>       .next("zero")
>       .where(new SimpleCondition<Integer>() {
>         @Override
>         public boolean filter(Integer value) throws Exception {
>           return value == 0;
>         }
>       });
>       PatternStream<Integer> patternStream = CEP.pattern(input, onesThenZero);
>       // Expected: 5 3
>       // Actual: 5 4 3 2 1 3 2 1
>       patternStream.select(new LengthOfRun()).print();
>       env.execute();
>     }
>     public static class LengthOfRun implements PatternSelectFunction<Integer, Integer> {
>       public Integer select(Map<String, List<Integer>> pattern) {
>       return pattern.get("ones").size();
>     }
>   }
> }
> {code}
> The only workaround for now seems to be to rewrite the pattern so that greedy() isn't needed – i.e. by bracketing the greedy section with a prefix and suffix that both have to be matched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)