You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/06/20 06:41:53 UTC

flink git commit: [FLINK-6904] [cep] Support for quantifier range to CEP's pattern API

Repository: flink
Updated Branches:
  refs/heads/finish-pr-4121 [created] 8835da996


[FLINK-6904] [cep] Support for quantifier range to CEP's pattern API

This closes #4121


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8835da99
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8835da99
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8835da99

Branch: refs/heads/finish-pr-4121
Commit: 8835da996f61222ef7291d73b5160e0405d81b16
Parents: 6cf6cb8
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Wed Jun 14 11:19:41 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Tue Jun 20 08:34:37 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/scala/pattern/Pattern.scala       |  13 +
 .../flink/cep/nfa/compiler/NFACompiler.java     |  49 +-
 .../org/apache/flink/cep/pattern/Pattern.java   |  27 +-
 .../apache/flink/cep/pattern/Quantifier.java    |  33 ++
 .../org/apache/flink/cep/nfa/NFAITCase.java     |   1 -
 .../apache/flink/cep/nfa/TimesRangeITCase.java  | 564 +++++++++++++++++++
 6 files changed, 657 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 3a30836..270b2f5 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -316,6 +316,19 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
+    * Specifies that the pattern can occur between from and to times.
+    *
+    * @param from number of times matching event must appear at least
+    * @param to   number of times matching event must appear at most
+    * @return The same pattern with the number of times range applied
+    * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
+    */
+  def times(from: Int, to: Int): Pattern[T, F] = {
+    jPattern.times(from, to)
+    this
+  }
+
+  /**
     * Applicable only to [[Quantifier.oneOrMore()]] and [[Quantifier.times()]] patterns,
     * this option allows more flexibility to the matching events.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index ce42acd..b5a437b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -28,6 +28,7 @@ import org.apache.flink.cep.nfa.StateTransitionAction;
 import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.Quantifier.Times;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
@@ -372,33 +373,23 @@ public class NFACompiler {
 		 * @param times     number of times the state should be copied
 		 * @return the first state of the "complex" state, next state should point to it
 		 */
-		private State<T> createTimesState(final State<T> sinkState, int times) {
-			State<T> lastSink = copyWithoutTransitiveNots(sinkState);
-			for (int i = 0; i < times - 1; i++) {
-				lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false);
+		private State<T> createTimesState(final State<T> sinkState, Times times) {
+			State<T> lastSink = sinkState;
+			final IterativeCondition<T> innerIgnoreCondition = getInnerIgnoreCondition(currentPattern);
+			for (int i = times.getFrom(); i < times.getTo(); i++) {
+				lastSink = createSingletonState(lastSink, sinkState, innerIgnoreCondition, true);
 				addStopStateToLooping(lastSink);
 			}
-
-			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
-			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
-
-			// we created the intermediate states in the loop, now we create the start of the loop.
-			if (!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
-				return createSingletonState(lastSink, ignoreCondition, false);
-			}
-
-			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
-			singletonState.addTake(lastSink, currentCondition);
-			singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction());
-
-			if (ignoreCondition != null) {
-				State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
-				ignoreState.addTake(lastSink, currentCondition);
-				ignoreState.addIgnore(ignoreCondition);
-				singletonState.addIgnore(ignoreState, ignoreCondition);
-				addStopStates(ignoreState);
+			for (int i = 0; i < times.getFrom() - 1; i++) {
+				lastSink = createSingletonState(lastSink, null, innerIgnoreCondition, false);
+				addStopStateToLooping(lastSink);
 			}
-			return singletonState;
+			// we created the intermediate states in the loop, now we create the start of the loop.
+			return createSingletonState(
+				lastSink,
+				sinkState,
+				getIgnoreCondition(currentPattern),
+				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
 		}
 
 		/**
@@ -413,6 +404,7 @@ public class NFACompiler {
 		private State<T> createSingletonState(final State<T> sinkState) {
 			return createSingletonState(
 				sinkState,
+				sinkState,
 				getIgnoreCondition(currentPattern),
 				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
 		}
@@ -424,10 +416,15 @@ public class NFACompiler {
 		 *
 		 * @param ignoreCondition condition that should be applied to IGNORE transition
 		 * @param sinkState state that the state being converted should point to
+		 * @param proceedState state that the state being converted should proceed to
+		 * @param isOptional whether the state being converted is optional
 		 * @return the created state
 		 */
 		@SuppressWarnings("unchecked")
-		private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
+		private State<T> createSingletonState(final State<T> sinkState,
+			final State<T> proceedState,
+			final IterativeCondition<T> ignoreCondition,
+			final boolean isOptional) {
 			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
 			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
@@ -438,7 +435,7 @@ public class NFACompiler {
 
 			if (isOptional) {
 				// if no element accepted the previous nots are still valid.
-				singletonState.addProceed(sinkState, trueFunction);
+				singletonState.addProceed(proceedState, trueFunction);
 			}
 
 			if (ignoreCondition != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 2676994..8767a94 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep.pattern;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
+import org.apache.flink.cep.pattern.Quantifier.Times;
 import org.apache.flink.cep.pattern.conditions.AndCondition;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.OrCondition;
@@ -64,7 +65,7 @@ public class Pattern<T, F extends T> {
 	 * Applicable to a {@code times} pattern, and holds
 	 * the number of times it has to appear.
 	 */
-	private int times;
+	private Times times;
 
 	protected Pattern(final String name, final Pattern<T, ? extends T> previous) {
 		this.name = name;
@@ -84,7 +85,7 @@ public class Pattern<T, F extends T> {
 		return previous;
 	}
 
-	public int getTimes() {
+	public Times getTimes() {
 		return times;
 	}
 
@@ -318,7 +319,27 @@ public class Pattern<T, F extends T> {
 		checkIfQuantifierApplied();
 		Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
 		this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
-		this.times = times;
+		this.times = Times.of(times);
+		return this;
+	}
+
+	/**
+	 * Specifies that the pattern can occur between from and to times.
+	 *
+	 * @param from number of times matching event must appear at least
+	 * @param to number of times matching event must appear at most
+	 * @return The same pattern with the number of times range applied
+	 *
+	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
+	 */
+	public Pattern<T, F> times(int from, int to) {
+		checkIfNoNotPattern();
+		checkIfQuantifierApplied();
+		this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
+		if (from == 0) {
+			this.quantifier.optional();
+		}
+		this.times = Times.of(from, to);
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index efc7cf4..504fec0 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.pattern;
 
+import org.apache.flink.util.Preconditions;
+
 import java.util.EnumSet;
 import java.util.Objects;
 
@@ -143,4 +145,35 @@ public class Quantifier {
 		NOT_NEXT
 	}
 
+	/**
+	 * Describe the times this {@link Pattern} can occur.
+	 */
+	public static class Times {
+		private final int from;
+		private final int to;
+
+		private Times(int from, int to) {
+			Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
+			Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + ".");
+			Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0.");
+			this.from = from;
+			this.to = to;
+		}
+
+		public int getFrom() {
+			return from;
+		}
+
+		public int getTo() {
+			return to;
+		}
+
+		public static Times of(int from, int to) {
+			return new Times(from, to);
+		}
+
+		public static Times of(int times) {
+			return new Times(times, times);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 20cb482..506587b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -1904,7 +1904,6 @@ public class NFAITCase extends TestLogger {
 				Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
 		));
 	}
-
 	///////////////////////////////         Consecutive           ////////////////////////////////////////
 
 	private static class ConsecutiveData {

http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
new file mode 100644
index 0000000..4305fa2
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * Tests for {@link Pattern#times(int, int)}.
+ */
+@SuppressWarnings("unchecked")
+public class TimesRangeITCase extends TestLogger {
+
+	@Test
+	public void testTimesRange() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, end1)
+		));
+	}
+
+	@Test
+	public void testTimesRangeNonStrict() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeStrict() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(0, 3).consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeStrictOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeStrictOptional1() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeNonStrictOptional1() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(1, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeNonStrictOptional2() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeNonStrictOptional3() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeNonStrictWithNext() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeNotStrictWithFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2, 3).followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end)
+		));
+	}
+
+	@Test
+	public void testTimesRangeNotStrictWithFollowedByAny() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+		));
+	}
+
+	private static class ConsecutiveData {
+		private static final Event startEvent = new Event(40, "c", 1.0);
+		private static final Event middleEvent1 = new Event(41, "a", 2.0);
+		private static final Event middleEvent2 = new Event(42, "a", 3.0);
+		private static final Event middleEvent3 = new Event(43, "a", 4.0);
+		private static final Event end = new Event(44, "b", 5.0);
+
+		private ConsecutiveData() {
+		}
+	}
+}