You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/08/22 18:04:23 UTC

flink git commit: [FLINK-7123] [cep] Support timesOrMore in CEP

Repository: flink
Updated Branches:
  refs/heads/master 40cec17f4 -> 9995588c8


[FLINK-7123] [cep] Support timesOrMore in CEP

This closes #4523.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9995588c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9995588c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9995588c

Branch: refs/heads/master
Commit: 9995588c83536e04aef3425757a008ba4c78dbde
Parents: 40cec17
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Fri Aug 11 11:29:01 2017 +0800
Committer: kkloudas <kk...@gmail.com>
Committed: Tue Aug 22 18:53:27 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  37 ++
 .../flink/cep/scala/pattern/Pattern.scala       |  18 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     |  68 +--
 .../org/apache/flink/cep/pattern/Pattern.java   |  23 +-
 .../apache/flink/cep/pattern/Quantifier.java    |   2 +-
 .../apache/flink/cep/nfa/TimesOrMoreITCase.java | 562 +++++++++++++++++++
 6 files changed, 653 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index fef1967..bddb9b2 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -186,6 +186,13 @@ named `start`, the following are valid quantifiers:
 
  // expecting 0 or more occurrences
  start.oneOrMore().optional();
+
+ // expecting 2 or more occurrences
+ start.timesOrMore(2);
+
+ // expecting 0, 2 or more occurrences
+ start.timesOrMore(2).optional();
+
  {% endhighlight %}
  </div>
 
@@ -208,6 +215,12 @@ named `start`, the following are valid quantifiers:
 
  // expecting 0 or more occurrences
  start.oneOrMore().optional()
+
+ // expecting 2 or more occurrences
+ start.timesOrMore(2);
+
+ // expecting 0, 2 or more occurrences
+ start.timesOrMore(2).optional();
  {% endhighlight %}
  </div>
  </div>
@@ -477,6 +490,18 @@ pattern.oneOrMore();
 {% endhighlight %}
           </td>
        </tr>
+           <tr>
+              <td><strong>timesOrMore(#times)</strong></td>
+              <td>
+                  <p>Specifies that this pattern expects at least <strong>#times</strong> occurrences
+                  of a matching event.</p>
+                  <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
+                  internal contiguity see <a href="#consecutive_java">consecutive</a>.</p>
+{% highlight java %}
+pattern.timesOrMore(2);
+{% endhighlight %}
+           </td>
+       </tr>
        <tr>
           <td><strong>times(#ofTimes)</strong></td>
           <td>
@@ -648,6 +673,18 @@ pattern.oneOrMore()
           </td>
        </tr>
        <tr>
+          <td><strong>timesOrMore(#times)</strong></td>
+          <td>
+              <p>Specifies that this pattern expects at least <strong>#times</strong> occurrences
+              of a matching event.</p>
+              <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
+              internal contiguity see <a href="#consecutive_scala">consecutive</a>.</p>
+{% highlight scala %}
+pattern.timesOrMore(2)
+{% endhighlight %}
+           </td>
+       </tr>
+       <tr>
                  <td><strong>times(#ofTimes)</strong></td>
                  <td>
                      <p>Specifies that this pattern expects an exact number of occurrences of a matching event.</p>

http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 5daebe0..5b41b90 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -344,7 +344,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     * {{{A1 A2 B}}} appears, this will generate patterns:
     * {{{A1 B}}} and {{{A1 A2 B}}}. See also {{{allowCombinations()}}}.
     *
-    * @return The same pattern with a [[Quantifier.oneOrMore()]] quantifier applied.
+    * @return The same pattern with a [[Quantifier.looping()]] quantifier applied.
     * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
     */
   def oneOrMore: Pattern[T, F] = {
@@ -378,7 +378,21 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
-    * Applicable only to [[Quantifier.oneOrMore()]] and [[Quantifier.times()]] patterns,
+    * Specifies that this pattern can occur the specified times at least.
+    * This means at least the specified times and at most infinite number of events can
+    * be matched to this pattern.
+    *
+    * @return The same pattern with a { @link Quantifier#looping(ConsumingStrategy)} quantifier
+    *         applied.
+    * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
+    */
+  def timesOrMore(times: Int): Pattern[T, F] = {
+    jPattern.timesOrMore(times)
+    this
+  }
+
+  /**
+    * Applicable only to [[Quantifier.looping()]] and [[Quantifier.times()]] patterns,
     * this option allows more flexibility to the matching events.
     *
     * If {{{allowCombinations()}}} is not applied for a

http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 9dc1837..4d4baca 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -291,13 +291,9 @@ public class NFACompiler {
 				final State<T> looping = createLooping(sink);
 
 				setCurrentGroupPatternFirstOfLoop(true);
-				if (!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
-					lastSink = createInitMandatoryStateOfOneOrMore(looping);
-				} else {
-					lastSink = createInitOptionalStateOfZeroOrMore(looping, sinkState);
-				}
+				lastSink = createTimesState(looping, sinkState, currentPattern.getTimes());
 			} else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) {
-				lastSink = createTimesState(sinkState, currentPattern.getTimes());
+				lastSink = createTimesState(sinkState, sinkState, currentPattern.getTimes());
 			} else {
 				lastSink = createSingletonState(sinkState);
 			}
@@ -407,16 +403,26 @@ public class NFACompiler {
 		 * same {@link IterativeCondition}.
 		 *
 		 * @param sinkState the state that the created state should point to
+		 * @param proceedState state that the state being converted should proceed to
 		 * @param times     number of times the state should be copied
 		 * @return the first state of the "complex" state, next state should point to it
 		 */
-		private State<T> createTimesState(final State<T> sinkState, Times times) {
+		@SuppressWarnings("unchecked")
+		private State<T> createTimesState(final State<T> sinkState, final State<T> proceedState, Times times) {
 			State<T> lastSink = sinkState;
 			setCurrentGroupPatternFirstOfLoop(false);
-			final IterativeCondition<T> takeCondition = getTakeCondition(currentPattern);
-			final IterativeCondition<T> innerIgnoreCondition = getInnerIgnoreCondition(currentPattern);
+			final IterativeCondition<T> untilCondition = (IterativeCondition<T>) currentPattern.getUntilCondition();
+			final IterativeCondition<T> innerIgnoreCondition = extendWithUntilCondition(
+				getInnerIgnoreCondition(currentPattern),
+				untilCondition,
+				false);
+			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
+				getTakeCondition(currentPattern),
+				untilCondition,
+				true);
+
 			for (int i = times.getFrom(); i < times.getTo(); i++) {
-				lastSink = createSingletonState(lastSink, sinkState, takeCondition, innerIgnoreCondition, true);
+				lastSink = createSingletonState(lastSink, proceedState, takeCondition, innerIgnoreCondition, true);
 				addStopStateToLooping(lastSink);
 			}
 			for (int i = 0; i < times.getFrom() - 1; i++) {
@@ -427,7 +433,7 @@ public class NFACompiler {
 			setCurrentGroupPatternFirstOfLoop(true);
 			return createSingletonState(
 				lastSink,
-				sinkState,
+				proceedState,
 				takeCondition,
 				getIgnoreCondition(currentPattern),
 				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
@@ -656,46 +662,6 @@ public class NFACompiler {
 		}
 
 		/**
-		 * Patterns with quantifiers AT_LEAST_ONE_* are created as a pair of states: a singleton state and
-		 * looping state. This method creates the first of the two.
-		 *
-		 * @param sinkState the state the newly created state should point to, it should be a looping state
-		 * @return the newly created state
-		 */
-		@SuppressWarnings("unchecked")
-		private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
-			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
-				getTakeCondition(currentPattern),
-				(IterativeCondition<T>) currentPattern.getUntilCondition(),
-				true
-			);
-
-			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
-
-			return createSingletonState(sinkState, null, takeCondition, ignoreCondition, false);
-		}
-
-		/**
-		 * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
-		 *
-		 * @param loopingState the first state of zeroOrMore complex state
-		 * @param lastSink     the state that the looping one points to
-		 * @return the newly created state
-		 */
-		@SuppressWarnings("unchecked")
-		private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
-			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
-				getTakeCondition(currentPattern),
-				(IterativeCondition<T>) currentPattern.getUntilCondition(),
-				true
-			);
-
-			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
-
-			return createSingletonState(loopingState, lastSink, takeCondition, ignoreFunction, true);
-		}
-
-		/**
 		 * This method extends the given condition with stop(until) condition if necessary.
 		 * The until condition needs to be applied only if both of the given conditions are not null.
 		 *

http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 2ffbc41..adf1397 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -326,13 +326,14 @@ public class Pattern<T, F extends T> {
 	 * {@code A1 A2 B} appears, this will generate patterns:
 	 * {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}.
 	 *
-	 * @return The same pattern with a {@link Quantifier#oneOrMore(ConsumingStrategy)} quantifier applied.
+	 * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier applied.
 	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
 	 */
 	public Pattern<T, F> oneOrMore() {
 		checkIfNoNotPattern();
 		checkIfQuantifierApplied();
-		this.quantifier = Quantifier.oneOrMore(quantifier.getConsumingStrategy());
+		this.quantifier = Quantifier.looping(quantifier.getConsumingStrategy());
+		this.times = Times.of(1);
 		return this;
 	}
 
@@ -375,7 +376,23 @@ public class Pattern<T, F extends T> {
 	}
 
 	/**
-	 * Applicable only to {@link Quantifier#oneOrMore(ConsumingStrategy)} and
+	 * Specifies that this pattern can occur the specified times at least.
+	 * This means at least the specified times and at most infinite number of events can
+	 * be matched to this pattern.
+	 *
+	 * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier applied.
+	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
+	 */
+	public Pattern<T, F> timesOrMore(int times) {
+		checkIfNoNotPattern();
+		checkIfQuantifierApplied();
+		this.quantifier = Quantifier.looping(quantifier.getConsumingStrategy());
+		this.times = Times.of(times);
+		return this;
+	}
+
+	/**
+	 * Applicable only to {@link Quantifier#looping(ConsumingStrategy)} and
 	 * {@link Quantifier#times(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events.
 	 *
 	 * <p>If {@code allowCombinations()} is not applied for a

http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 9192a13..2136706 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -55,7 +55,7 @@ public class Quantifier {
 		return new Quantifier(consumingStrategy, QuantifierProperty.SINGLE);
 	}
 
-	public static Quantifier oneOrMore(final ConsumingStrategy consumingStrategy) {
+	public static Quantifier looping(final ConsumingStrategy consumingStrategy) {
 		return new Quantifier(consumingStrategy, QuantifierProperty.LOOPING);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
new file mode 100644
index 0000000..4e540dd
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * Tests for {@link Pattern#timesOrMore(int)}.
+ */
+public class TimesOrMoreITCase extends TestLogger {
+	@Test
+	public void testTimesOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreNonStrict() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreStrict() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreStrictOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreStrictOptional2() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,}, b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreNonStrictOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreNonStrictOptional2() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreNonStrictOptional3() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreNonStrictWithNext() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreNotStrictWithFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesOrMoreNotStrictWithFollowedByAny() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		// c a{2,} b
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).timesOrMore(2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	private static class ConsecutiveData {
+		private static final Event startEvent = new Event(40, "c", 1.0);
+		private static final Event middleEvent1 = new Event(41, "a", 2.0);
+		private static final Event middleEvent2 = new Event(42, "a", 3.0);
+		private static final Event middleEvent3 = new Event(43, "a", 4.0);
+		private static final Event end = new Event(44, "b", 5.0);
+
+		private ConsecutiveData() {
+		}
+	}
+}