You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dianfu <gi...@git.apache.org> on 2017/07/11 08:06:38 UTC

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

GitHub user dianfu opened a pull request:

    https://github.com/apache/flink/pull/4296

    [FLINK-7147] [cep] Support greedy quantifier in CEP

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dianfu/flink greedy_quantifier

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4296.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4296
    
----
commit 647d4c58309990d9fc924bc4343ef811dacf4ef1
Author: Dian Fu <fu...@alibaba-inc.com>
Date:   2017-07-11T08:03:42Z

    [FLINK-7147] [cep] Support greedy quantifier in CEP

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r135367240
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
     				return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional);
     			}
     
    -			final IterativeCondition<T> trueFunction = getTrueFunction();
    -
     			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
     			// if event is accepted then all notPatterns previous to the optional states are no longer valid
     			final State<T> sink = copyWithoutTransitiveNots(sinkState);
     			singletonState.addTake(sink, takeCondition);
     
    +			// if no element accepted the previous nots are still valid.
    +			final IterativeCondition<T> proceedCondition = getTrueFunction();
    +
     			// for the first state of a group pattern, its PROCEED edge should point to the following state of
     			// that group pattern and the edge will be added at the end of creating the NFA for that group pattern
     			if (isOptional && !headOfGroup(currentPattern)) {
    -				// if no element accepted the previous nots are still valid.
    -				singletonState.addProceed(proceedState, trueFunction);
    +				if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    +					final IterativeCondition<T> untilCondition =
    +						(IterativeCondition<T>) currentPattern.getUntilCondition();
    +					if (untilCondition != null) {
    +						singletonState.addProceed(
    +							originalStateMap.get(proceedState.getName()),
    +							new AndCondition<>(proceedCondition, untilCondition));
    --- End diff --
    
    Why is this not wrapped with NotCondition ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131804932
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(a3, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList());
    +	}
    +
    +	@Test
    +	public void testGreedyUtilOneOrMoreWithDummyEventsAfterQuantifier() {
    --- End diff --
    
    Updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131804398
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -386,6 +387,19 @@ private void checkPatternNameUniqueness(final Pattern pattern) {
     			return copyOfSink;
     		}
     
    +		private State<T> copy(final State<T> state) {
    +			final State<T> copyOfState = new State<>(state.getName(), state.getStateType());
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    +1, LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    @dawidwys Any comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r135441351
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -421,6 +437,15 @@ private void addStopStateToLooping(final State<T> loopingState) {
     				untilCondition,
     				true);
     
    +			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) &&
    +				times.getFrom() != times.getTo()) {
    +				if (untilCondition != null) {
    +					State<T> sinkStateCopy = copy(sinkState);
    +					originalStateMap.put(sinkState.getName(), sinkStateCopy);
    --- End diff --
    
    originalStateMap is used when compiling the NFA and it will be collected after NFA is created and so I think it's unnecessary to clear the entries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r129261577
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---
    @@ -105,6 +107,14 @@ public void optional() {
     		properties.add(Quantifier.QuantifierProperty.OPTIONAL);
     	}
     
    +	public void greedy() {
    +		greedy = true;
    --- End diff --
    
    How about applying greedy to SINGLETON state? I think it is illegal combination. Also maybe change GREEDY into `QuantifierProperty`? It will decrease the number of methods/fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r129262270
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---
    @@ -492,4 +506,10 @@ private void checkIfQuantifierApplied() {
     					"Current quantifier is: " + quantifier);
     		}
     	}
    +
    +	private void checkIfNoFollowedByAny() {
    +		if (quantifier.getConsumingStrategy() == ConsumingStrategy.SKIP_TILL_ANY) {
    --- End diff --
    
    I think it is a valid combination. I think the InnerConsumingStrategy should not be `SKIP_TILL_ANY`.
    
    So `followedByAny("loop").oneOrMore().greedy()` in my opinion is a valid one, but `followedByAny("loop").oneOrMore().allowCombinations().greedy()` is not. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r129263762
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtil() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 4.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testEndWithGreedy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 2.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a*
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3)
    --- End diff --
    
    I have some doubts about this result. I think in this case the result should be:
    `{c, a1, a2, a3}, {c, a1, a2}, {c, a1}, {c}`. 
    
    Otherwise we would not be able to emit results at all, as assuming the stream is unbounded there still may occur some `a4`, doesn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    merging this. 
    
    @dianfu could you create a JIRA for the `Optional` after `greedy` to track interest in that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131804902
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilWithDummyEventsBeforeQuantifier() {
    --- End diff --
    
    Updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    @dawidwys Could you help to take a look at this PR? Thanks a lot in advance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131628265
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -386,6 +387,19 @@ private void checkPatternNameUniqueness(final Pattern pattern) {
     			return copyOfSink;
     		}
     
    +		private State<T> copy(final State<T> state) {
    +			final State<T> copyOfState = new State<>(state.getName(), state.getStateType());
    --- End diff --
    
    The copy should be made with `createState` method which does ensure there is a unique name assigned, which is neccessary for the serialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131629516
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(a3, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList());
    +	}
    +
    +	@Test
    +	public void testGreedyUtilOneOrMoreWithDummyEventsAfterQuantifier() {
    --- End diff --
    
    misspell: `testGreedyUtilOneOrMoreWithDummyEventsAfterQuantifier` -> `testGreedyUntilOneOrMoreWithDummyEventsAfterQuantifier`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r135366609
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
     				return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional);
     			}
     
    -			final IterativeCondition<T> trueFunction = getTrueFunction();
    -
     			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
     			// if event is accepted then all notPatterns previous to the optional states are no longer valid
     			final State<T> sink = copyWithoutTransitiveNots(sinkState);
     			singletonState.addTake(sink, takeCondition);
     
    +			// if no element accepted the previous nots are still valid.
    +			final IterativeCondition<T> proceedCondition = getTrueFunction();
    +
     			// for the first state of a group pattern, its PROCEED edge should point to the following state of
     			// that group pattern and the edge will be added at the end of creating the NFA for that group pattern
     			if (isOptional && !headOfGroup(currentPattern)) {
    -				// if no element accepted the previous nots are still valid.
    -				singletonState.addProceed(proceedState, trueFunction);
    +				if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    +					final IterativeCondition<T> untilCondition =
    +						(IterativeCondition<T>) currentPattern.getUntilCondition();
    +					if (untilCondition != null) {
    +						singletonState.addProceed(
    +							originalStateMap.get(proceedState.getName()),
    +							new AndCondition<>(proceedCondition, untilCondition));
    +					}
    +					singletonState.addProceed(proceedState,
    +						untilCondition != null
    --- End diff --
    
    Redundant check - see line 568


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    I think we are getting close to the final version. Good job! Still though have two higher level comments:
    
    - is current times().greedy() behaviour intended? For pattern `a{2, 5} b` and sequence `a1 a2 a3 a4 b` shouldn't it return just one match: `a1 a2 a3 a4 b` Instead of three as in `testGreedyTimesRange`
    - this feature should be documented in the docs. It should be also very clearly stated there that it does not work for `GroupPatterns`
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    @dawidwys Thanks a lot for the review. I have updated the patch. Currently, there is something wrong when the greedy state is followed by an optional state. This can be covered by test case GreedyITCase.testGreedyZeroOrMoreBeforeOptional2 (duplicate results will be got). Solutions from my mind are removing the duplicate results before returning the results in NFA or disabling this case for the time being. What's your thought? Do you have any suggestions to this problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131804887
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    --- End diff --
    
    Updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131629322
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilWithDummyEventsBeforeQuantifier() {
    --- End diff --
    
    misspell: `testGreedyUtilWithDummyEventsBeforeQuantifier` -> `testGreedyUntilWithDummyEventsBeforeQuantifier`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r129832284
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtil() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 4.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testEndWithGreedy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 2.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a*
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3)
    --- End diff --
    
    I agree with you. If the greedy state is last state of the NFA, it will make no difference whether it is greedy or not. What about only allowing greedy state for the last state if its  InnerConsumingStrategy is STRICT? In this case, we can edit results once encountering an event not meeting the conditions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r135366122
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -421,6 +437,15 @@ private void addStopStateToLooping(final State<T> loopingState) {
     				untilCondition,
     				true);
     
    +			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) &&
    +				times.getFrom() != times.getTo()) {
    +				if (untilCondition != null) {
    +					State<T> sinkStateCopy = copy(sinkState);
    +					originalStateMap.put(sinkState.getName(), sinkStateCopy);
    --- End diff --
    
    When are the old entries cleared in this map ?
    Shall we consider using map which expires entries by TTL ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4296


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    Sure. Have created FLINK-7496 to track this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131629357
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    --- End diff --
    
    misspell: `testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier ` -> `testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r135440842
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
     				return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional);
     			}
     
    -			final IterativeCondition<T> trueFunction = getTrueFunction();
    -
     			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
     			// if event is accepted then all notPatterns previous to the optional states are no longer valid
     			final State<T> sink = copyWithoutTransitiveNots(sinkState);
     			singletonState.addTake(sink, takeCondition);
     
    +			// if no element accepted the previous nots are still valid.
    +			final IterativeCondition<T> proceedCondition = getTrueFunction();
    +
     			// for the first state of a group pattern, its PROCEED edge should point to the following state of
     			// that group pattern and the edge will be added at the end of creating the NFA for that group pattern
     			if (isOptional && !headOfGroup(currentPattern)) {
    -				// if no element accepted the previous nots are still valid.
    -				singletonState.addProceed(proceedState, trueFunction);
    +				if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    +					final IterativeCondition<T> untilCondition =
    +						(IterativeCondition<T>) currentPattern.getUntilCondition();
    +					if (untilCondition != null) {
    +						singletonState.addProceed(
    +							originalStateMap.get(proceedState.getName()),
    +							new AndCondition<>(proceedCondition, untilCondition));
    --- End diff --
    
    When untilCondition holds, the loop should break and the state should proceed to the next state. This is covered by the test case GreedyITCase#testGreedyUntilWithDummyEventsBeforeQuantifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    @dianfu Sorry for the delay, but unfortunately I will not have enough time for a proper review before my vacation. I will get back to it after 24.07.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    I've started reviewing it, but realised it is working as I expected only in case where the inner consuming strategy is `STRICT`. 
    
    Let's have a look at test like this one:
    
    	@Test
    	public void testGreedyFollowedByInBetween() {
    		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    
    		Event c = new Event(40, "c", 1.0);
    		Event a1 = new Event(41, "a", 2.0);
    		Event a2 = new Event(42, "a", 2.0);
    		Event a3 = new Event(43, "a", 2.0);
    		Event d = new Event(44, "d", 3.0);
    
    		inputEvents.add(new StreamRecord<>(c, 1));
    		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    		inputEvents.add(new StreamRecord<>(a1, 3));
    		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    		inputEvents.add(new StreamRecord<>(a2, 5));
    		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    		inputEvents.add(new StreamRecord<>(a3, 7));
    		inputEvents.add(new StreamRecord<>(d, 8));
    
    		// c a* d
    		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("c");
    			}
    		}).followedBy("middle").where(new SimpleCondition<Event>() {
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("a");
    			}
    		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("d");
    			}
    		});
    
    		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    
    		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    
    		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    			Lists.newArrayList(c, a1, a2, a3, d)
    		));
    	}
    
    I would expect only that one result but I get:
    
        Lists.newArrayList(c, a1, a2, a3, d),
        Lists.newArrayList(c, a1, a2, d),
        Lists.newArrayList(c, a1, d),
        Lists.newArrayList(c, d)
    
    Which is the same with or without the `greedy()` applied.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    @dawidwys Regarding to the times().greedy(), the result is not expected and have fixed the issue in the latest PR. Also updated the doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r130537682
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -657,25 +663,34 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
     				true);
     
     			IterativeCondition<T> proceedCondition = getTrueFunction();
    -			if (currentPattern.getQuantifier().isGreedy()) {
    -				proceedCondition = getGreedyCondition(proceedCondition, Lists.newArrayList(takeCondition));
    +			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    +				proceedCondition = getGreedyCondition(
    +					proceedCondition,
    +					takeCondition,
    +					ignoreCondition,
    +					followingTakeCondition);;
     			}
     			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal,
    -				currentPattern.getQuantifier().isGreedy());
    +				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
     			loopingState.addProceed(sinkState, proceedCondition);
     			loopingState.addTake(takeCondition);
     
     			addStopStateToLooping(loopingState);
     
     			if (ignoreCondition != null) {
     				final State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal,
    -					currentPattern.getQuantifier().isGreedy());
    +					currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
     				ignoreState.addTake(loopingState, takeCondition);
     				ignoreState.addIgnore(ignoreCondition);
     				loopingState.addIgnore(ignoreState, ignoreCondition);
    +				if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    --- End diff --
    
    I think the problem you mentioned with `GreedyITCase.testGreedyZeroOrMoreBeforeOptional2` is due to that proceed. The whole reason behind the additional ignoreState was for it not to have the `PROCEED` transition as it creates additional computationStates with the `sinkState`. I think for the greedy to work we need to put customized ignoreCondition into the `sinkState` of a greedy Pattern. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r129830324
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---
    @@ -105,6 +107,14 @@ public void optional() {
     		properties.add(Quantifier.QuantifierProperty.OPTIONAL);
     	}
     
    +	public void greedy() {
    +		greedy = true;
    --- End diff --
    
    Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131804949
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(a3, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList());
    +	}
    +
    +	@Test
    +	public void testGreedyUtilOneOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilOneOrMoreWithDummyEventsBeforeQuantifier() {
    --- End diff --
    
    Updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r130883414
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -657,25 +663,34 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
     				true);
     
     			IterativeCondition<T> proceedCondition = getTrueFunction();
    -			if (currentPattern.getQuantifier().isGreedy()) {
    -				proceedCondition = getGreedyCondition(proceedCondition, Lists.newArrayList(takeCondition));
    +			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    +				proceedCondition = getGreedyCondition(
    +					proceedCondition,
    +					takeCondition,
    +					ignoreCondition,
    +					followingTakeCondition);;
     			}
     			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal,
    -				currentPattern.getQuantifier().isGreedy());
    +				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
     			loopingState.addProceed(sinkState, proceedCondition);
     			loopingState.addTake(takeCondition);
     
     			addStopStateToLooping(loopingState);
     
     			if (ignoreCondition != null) {
     				final State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal,
    -					currentPattern.getQuantifier().isGreedy());
    +					currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
     				ignoreState.addTake(loopingState, takeCondition);
     				ignoreState.addIgnore(ignoreCondition);
     				loopingState.addIgnore(ignoreState, ignoreCondition);
    +				if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    --- End diff --
    
    @dawidwys  Good advice. Thanks a lot :). I have updated the PR per the solution you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131629581
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +	@Test
    +	public void testGreedyZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional();
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c),
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyZeroOrMoreBeforeOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(43, "d", 3.0);
    +		Event e = new Event(44, "e", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(d, 4));
    +		inputEvents.add(new StreamRecord<>(e, 5));
    +
    +		// c a* d e
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, e),
    +			Lists.newArrayList(c, a1, a2, d, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(a3, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a* d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreInBetween() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event a3 = new Event(43, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2));
    +		inputEvents.add(new StreamRecord<>(a1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4));
    +		inputEvents.add(new StreamRecord<>(a2, 5));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6));
    +		inputEvents.add(new StreamRecord<>(a3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyOneOrMoreWithDummyEventsBeforeQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event d = new Event(44, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList());
    +	}
    +
    +	@Test
    +	public void testGreedyUtilOneOrMoreWithDummyEventsAfterQuantifier() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 3.0);
    +		Event a3 = new Event(43, "a", 3.0);
    +		Event d = new Event(45, "d", 3.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(a3, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		// c a+ d
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).oneOrMore().greedy().until(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getPrice() > 3.0;
    +			}
    +		}).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, a2, a3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGreedyUtilOneOrMoreWithDummyEventsBeforeQuantifier() {
    --- End diff --
    
    misspell: `testGreedyUtilOneOrMoreWithDummyEventsBeforeQuantifier` -> `testGreedyUntilOneOrMoreWithDummyEventsBeforeQuantifier`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131804764
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -637,9 +675,23 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
     				untilCondition,
     				true);
     
    -			final IterativeCondition<T> proceedCondition = getTrueFunction();
    +			IterativeCondition<T> proceedCondition = getTrueFunction();
     			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);
    -			loopingState.addProceed(sinkState, proceedCondition);
    +
    +			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    +				State<T> sinkStateCopy = copy(sinkState);
    --- End diff --
    
    Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    There is also one more problem. When we have optional after `greedy` it does not work well. E.g. have a look at this test case:
    
    	@Test
    	public void testGreedyZeroOrMoreBeforeOptional2() {
    		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    
    		Event c = new Event(40, "c", 1.0);
    		Event a1 = new Event(41, "a", 2.0);
    		Event a2 = new Event(42, "a", 2.0);
    		Event d = new Event(43, "d", 3.0);
    		Event a3 = new Event(42, "a", 2.0);
    		Event e = new Event(44, "e", 3.0);
    
    		inputEvents.add(new StreamRecord<>(c, 1));
    		inputEvents.add(new StreamRecord<>(a1, 2));
    		inputEvents.add(new StreamRecord<>(a2, 3));
    		inputEvents.add(new StreamRecord<>(d, 4));
    		inputEvents.add(new StreamRecord<>(a3, 5));
    		inputEvents.add(new StreamRecord<>(e, 6));
    
    		// c a* d e
    		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("c");
    			}
    		}).followedBy("middle1").where(new SimpleCondition<Event>() {
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("a");
    			}
    		}).oneOrMore().optional().greedy().followedBy("middle2").where(new SimpleCondition<Event>() {
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("d");
    			}
    		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
    			private static final long serialVersionUID = 5726188262756267490L;
    
    			@Override
    			public boolean filter(Event value) throws Exception {
    				return value.getName().equals("e");
    			}
    		});
    
    		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    
    		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    
    		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    			Lists.newArrayList(c, a1, a2, a3, e),
    			Lists.newArrayList(c, a1, a2, d, e)
    		));
    	}
    
    Right know it also returns `c a1 a2 e`, which I think is not correct. I don't think there is an easy way to fix it right now. I would suggest restricting on the Pattern level that greedy must not be followed by an `Optional` patten. I would like to hear opinions on that, @kl0u.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4296
  
    @dawidwys OK. Have a good time. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r129830367
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---
    @@ -492,4 +506,10 @@ private void checkIfQuantifierApplied() {
     					"Current quantifier is: " + quantifier);
     		}
     	}
    +
    +	private void checkIfNoFollowedByAny() {
    +		if (quantifier.getConsumingStrategy() == ConsumingStrategy.SKIP_TILL_ANY) {
    --- End diff --
    
    Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131628087
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -637,9 +675,23 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
     				untilCondition,
     				true);
     
    -			final IterativeCondition<T> proceedCondition = getTrueFunction();
    +			IterativeCondition<T> proceedCondition = getTrueFunction();
     			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);
    -			loopingState.addProceed(sinkState, proceedCondition);
    +
    +			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
    +				State<T> sinkStateCopy = copy(sinkState);
    --- End diff --
    
    If I understood correctly the copies are needed only in casese where there is the `untilCondition`. Am I right? If so let's create the copy then. Right know there are dangling copies of the next when there is no `untilCondtion`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---