You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/03/23 12:47:24 UTC

[1/3] flink git commit: [FLINK-3318] Backward compatibility of CEP NFA

Repository: flink
Updated Branches:
  refs/heads/master d0695c054 -> d20fb090c


[FLINK-3318] Backward compatibility of CEP NFA


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20fb090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20fb090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20fb090

Branch: refs/heads/master
Commit: d20fb090c31858bc0372a8c84228d796558d56b0
Parents: 9001c4e
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Tue Mar 21 10:53:07 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 162 ++++++-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  49 ++
 .../java/org/apache/flink/cep/nfa/State.java    |  19 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     | 116 +++++
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 194 +++++++-
 .../cep/operator/CEPMigration12to13Test.java    | 477 +++++++++++++++++++
 .../test/resources/cep-branching-snapshot-1.2   | Bin 0 -> 6736 bytes
 .../resources/cep-single-pattern-snapshot-1.2   | Bin 0 -> 3311 bytes
 .../test/resources/cep-starting-snapshot-1.2    | Bin 0 -> 6526 bytes
 9 files changed, 986 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 3d42248..ab03566 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,18 +27,22 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OptionalDataException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -85,9 +91,9 @@ public class NFA<T> implements Serializable {
 	private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;
 
 	/**
-	 * 	Buffer used to store the matched events.
+	 * 	Used only for backward compatibility. Buffer used to store the matched events.
 	 */
-	private final SharedBuffer<String, T> sharedBuffer;
+	private final SharedBuffer<State<T>, T> sharedBuffer = null;
 
 	/**
 	 * A set of all the valid NFA states, as returned by the
@@ -110,12 +116,22 @@ public class NFA<T> implements Serializable {
 	private final boolean handleTimeout;
 
 	/**
+	 * Used only for backward compatibility.
+	 */
+	private int startEventCounter;
+
+	/**
 	 * Current set of {@link ComputationState computation states} within the state machine.
 	 * These are the "active" intermediate states that are waiting for new matching
 	 * events to transition to new valid states.
 	 */
 	private transient Queue<ComputationState<T>> computationStates;
 
+	/**
+	 * 	Buffer used to store the matched events.
+	 */
+	private final SharedBuffer<String, T> stringSharedBuffer;
+
 	public NFA(
 			final TypeSerializer<T> eventSerializer,
 			final long windowTime,
@@ -124,7 +140,7 @@ public class NFA<T> implements Serializable {
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
 		this.windowTime = windowTime;
 		this.handleTimeout = handleTimeout;
-		sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+		stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
 		computationStates = new LinkedList<>();
 
 		states = new HashSet<>();
@@ -156,7 +172,7 @@ public class NFA<T> implements Serializable {
 	 * {@code false} otherwise.
 	 */
 	public boolean isEmpty() {
-		return sharedBuffer.isEmpty();
+		return stringSharedBuffer.isEmpty();
 	}
 
 	/**
@@ -194,9 +210,14 @@ public class NFA<T> implements Serializable {
 					}
 				}
 
-				// remove computation state which has exceeded the window length
-				sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
-				sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
+				stringSharedBuffer.release(
+						computationState.getPreviousState().getName(),
+						computationState.getEvent(),
+						computationState.getTimestamp());
+				stringSharedBuffer.remove(
+						computationState.getPreviousState().getName(),
+						computationState.getEvent(),
+						computationState.getTimestamp());
 
 				newComputationStates = Collections.emptyList();
 			} else if (event != null) {
@@ -212,8 +233,8 @@ public class NFA<T> implements Serializable {
 					result.addAll(matches);
 
 					// remove found patterns because they are no longer needed
-					sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
-					sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					stringSharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					stringSharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
 				} else {
 					// add new computation state; it will be processed once the next event arrives
 					computationStates.add(newComputationState);
@@ -230,7 +251,7 @@ public class NFA<T> implements Serializable {
 
 				// remove all elements which are expired
 				// with respect to the window length
-				sharedBuffer.prune(pruningTimestamp);
+				stringSharedBuffer.prune(pruningTimestamp);
 			}
 		}
 
@@ -244,7 +265,7 @@ public class NFA<T> implements Serializable {
 			NFA<T> other = (NFA<T>) obj;
 
 			return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
-				sharedBuffer.equals(other.sharedBuffer) &&
+				stringSharedBuffer.equals(other.stringSharedBuffer) &&
 				states.equals(other.states) &&
 				windowTime == other.windowTime;
 		} else {
@@ -254,7 +275,7 @@ public class NFA<T> implements Serializable {
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime);
+		return Objects.hash(nonDuplicatingTypeSerializer, stringSharedBuffer, states, windowTime);
 	}
 
 	private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
@@ -376,7 +397,7 @@ public class NFA<T> implements Serializable {
 								computationState.getStartTimestamp()
 							)
 						);
-						sharedBuffer.lock(
+						stringSharedBuffer.lock(
 							computationState.getPreviousState().getName(),
 							computationState.getEvent(),
 							computationState.getTimestamp());
@@ -397,14 +418,14 @@ public class NFA<T> implements Serializable {
 					final long startTimestamp;
 					if (computationState.isStartState()) {
 						startTimestamp = timestamp;
-						sharedBuffer.put(
+						stringSharedBuffer.put(
 							consumingState.getName(),
 							event,
 							timestamp,
 							currentVersion);
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
-						sharedBuffer.put(
+						stringSharedBuffer.put(
 							consumingState.getName(),
 							event,
 							timestamp,
@@ -415,7 +436,7 @@ public class NFA<T> implements Serializable {
 					}
 
 					// a new computation state is referring to the shared entry
-					sharedBuffer.lock(consumingState.getName(), event, timestamp);
+					stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
 
 					resultingComputationStates.add(ComputationState.createState(
 						newState,
@@ -429,7 +450,7 @@ public class NFA<T> implements Serializable {
 					//check if newly created state is optional (have a PROCEED path to Final state)
 					final State<T> finalState = findFinalStateAfterProceed(newState, event);
 					if (finalState != null) {
-						sharedBuffer.lock(consumingState.getName(), event, timestamp);
+						stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
 						resultingComputationStates.add(ComputationState.createState(
 							finalState,
 							consumingState,
@@ -450,12 +471,12 @@ public class NFA<T> implements Serializable {
 
 		if (computationState.getEvent() != null) {
 			// release the shared entry referenced by the current computation state.
-			sharedBuffer.release(
+			stringSharedBuffer.release(
 				computationState.getPreviousState().getName(),
 				computationState.getEvent(),
 				computationState.getTimestamp());
 			// try to remove unnecessary shared buffer entries
-			sharedBuffer.remove(
+			stringSharedBuffer.remove(
 				computationState.getPreviousState().getName(),
 				computationState.getEvent(),
 				computationState.getTimestamp());
@@ -546,7 +567,7 @@ public class NFA<T> implements Serializable {
 	 * @return Collection of event sequences which end in the given computation state
 	 */
 	private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) {
-		Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns(
+		Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns(
 			computationState.getPreviousState().getName(),
 			computationState.getEvent(),
 			computationState.getTimestamp(),
@@ -592,6 +613,8 @@ public class NFA<T> implements Serializable {
 		nonDuplicatingTypeSerializer.clearReferences();
 	}
 
+	private final static String BEGINNING_STATE_NAME = "$beginningState$";
+
 	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		ois.defaultReadObject();
 
@@ -599,15 +622,103 @@ public class NFA<T> implements Serializable {
 
 		computationStates = new LinkedList<>();
 
+		final List<ComputationState<T>> readComputationStates = new ArrayList<>(numberComputationStates);
+
+		boolean afterMigration = false;
 		for (int i = 0; i < numberComputationStates; i++) {
 			ComputationState<T> computationState = readComputationState(ois);
+			if (computationState.getState().getName().equals(BEGINNING_STATE_NAME)) {
+				afterMigration = true;
+			}
 
-			computationStates.offer(computationState);
+			readComputationStates.add(computationState);
+		}
+
+		if (afterMigration && !readComputationStates.isEmpty()) {
+			try {
+				//Backwards compatibility
+				this.computationStates.addAll(migrateNFA(readComputationStates));
+				final Field newSharedBufferField = NFA.class.getDeclaredField("stringSharedBuffer");
+				final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer");
+				sharedBufferField.setAccessible(true);
+				newSharedBufferField.setAccessible(true);
+				newSharedBufferField.set(this, SharedBuffer.migrateSharedBuffer(this.sharedBuffer));
+				sharedBufferField.set(this, null);
+				sharedBufferField.setAccessible(false);
+				newSharedBufferField.setAccessible(false);
+			} catch (Exception e) {
+				throw new IllegalStateException("Could not migrate from earlier version", e);
+			}
+		} else {
+			this.computationStates.addAll(readComputationStates);
 		}
 
 		nonDuplicatingTypeSerializer.clearReferences();
 	}
 
+	/**
+	 * Needed for backward compatibility. First migrates the {@link State} graph see {@link NFACompiler#migrateGraph(State)}.
+	 * Than recreates the {@link ComputationState}s with the new {@link State} graph.
+	 * @param readStates computation states read from snapshot
+	 * @return collection of migrated computation states
+	 */
+	private Collection<ComputationState<T>> migrateNFA(Collection<ComputationState<T>> readStates) {
+		final ArrayList<ComputationState<T>> computationStates = new ArrayList<>();
+
+		final State<T> startState = Iterators.find(
+			readStates.iterator(),
+			new Predicate<ComputationState<T>>() {
+				@Override
+				public boolean apply(@Nullable ComputationState<T> input) {
+					return input != null && input.getState().getName().equals(BEGINNING_STATE_NAME);
+				}
+			}).getState();
+
+		final Map<String, State<T>> convertedStates = NFACompiler.migrateGraph(startState);
+
+		for (ComputationState<T> readState : readStates) {
+			if (!readState.isStartState()) {
+				final String previousName = readState.getState().getName();
+				final String currentName = Iterators.find(
+					readState.getState().getStateTransitions().iterator(),
+					new Predicate<StateTransition<T>>() {
+						@Override
+						public boolean apply(@Nullable StateTransition<T> input) {
+							return input != null && input.getAction() == StateTransitionAction.TAKE;
+						}
+					}).getTargetState().getName();
+
+
+				final State<T> previousState = convertedStates.get(previousName);
+
+				computationStates.add(ComputationState.createState(
+					convertedStates.get(currentName),
+					previousState,
+					readState.getEvent(),
+					readState.getTimestamp(),
+					readState.getVersion(),
+					readState.getStartTimestamp()
+				));
+			}
+		}
+
+		final String startName = Iterators.find(convertedStates.values().iterator(), new Predicate<State<T>>() {
+			@Override
+			public boolean apply(@Nullable State<T> input) {
+				return input != null && input.isStart();
+			}
+		}).getName();
+
+		computationStates.add(ComputationState.createStartState(
+			convertedStates.get(startName),
+			new DeweyNumber(this.startEventCounter)));
+
+		this.states.clear();
+		this.states.addAll(convertedStates.values());
+
+		return computationStates;
+	}
+
 	private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
 		oos.writeObject(computationState.getState());
 		oos.writeObject(computationState.getPreviousState());
@@ -629,7 +740,13 @@ public class NFA<T> implements Serializable {
 	@SuppressWarnings("unchecked")
 	private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		final State<T> state = (State<T>)ois.readObject();
-		final State<T> previousState = (State<T>)ois.readObject();
+		State<T> previousState;
+		try {
+			previousState = (State<T>)ois.readObject();
+		} catch (OptionalDataException e) {
+			previousState = null;
+		}
+
 		final long timestamp = ois.readLong();
 		final DeweyNumber version = (DeweyNumber)ois.readObject();
 		final long startTimestamp = ois.readLong();
@@ -647,6 +764,7 @@ public class NFA<T> implements Serializable {
 		return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
 	}
 
+
 	/**
 	 * Generates a state name from a given name template and an index.
 	 * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index e6a8c75..d5b7876 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -463,6 +464,54 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		}
 	}
 
+	private SharedBuffer(
+		TypeSerializer<V> valueSerializer,
+		Map<K, SharedBufferPage<K, V>> pages) {
+		this.valueSerializer = valueSerializer;
+		this.pages = pages;
+	}
+
+	/**
+	 * For backward compatibility only. Previously the key in {@link SharedBuffer} was {@link State}.
+	 * Now it is {@link String}.
+	 */
+	@Internal
+	static <T> SharedBuffer<String, T> migrateSharedBuffer(SharedBuffer<State<T>, T> buffer) {
+
+		final Map<String, SharedBufferPage<String, T>> pageMap = new HashMap<>();
+		final Map<SharedBufferEntry<State<T>, T>, SharedBufferEntry<String, T>> entries = new HashMap<>();
+
+		for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
+			final SharedBufferPage<String, T> newPage = new SharedBufferPage<>(page.getKey().getName());
+			pageMap.put(newPage.getKey(), newPage);
+
+			for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
+				final SharedBufferEntry<String, T> newSharedBufferEntry = new SharedBufferEntry<>(
+					pageEntry.getKey(),
+					newPage);
+				newSharedBufferEntry.referenceCounter = pageEntry.getValue().referenceCounter;
+				entries.put(pageEntry.getValue(), newSharedBufferEntry);
+				newPage.entries.put(pageEntry.getKey(), newSharedBufferEntry);
+			}
+		}
+
+		for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
+			for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
+				final SharedBufferEntry<String, T> newEntry = entries.get(pageEntry.getValue());
+				for (SharedBufferEdge<State<T>, T> edge : pageEntry.getValue().edges) {
+					final SharedBufferEntry<String, T> targetNewEntry = entries.get(edge.getTarget());
+
+					final SharedBufferEdge<String, T> newEdge = new SharedBufferEdge<>(
+						targetNewEntry,
+						edge.getVersion());
+					newEntry.edges.add(newEdge);
+				}
+			}
+		}
+
+		return new SharedBuffer<>(buffer.valueSerializer, pageMap);
+	}
+
 	private SharedBufferEntry<K, V> get(
 		final K key,
 		final V value,

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 7bcb6ea..27e0dcd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -20,9 +20,12 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -62,7 +65,6 @@ public class State<T> implements Serializable {
 		return stateTransitions;
 	}
 
-
 	private void addStateTransition(
 		final StateTransitionAction action,
 		final State<T> targetState,
@@ -132,4 +134,19 @@ public class State<T> implements Serializable {
 		Final, // the state is a final state for the NFA
 		Normal // the state is neither a start nor a final state
 	}
+
+	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+		ois.defaultReadObject();
+
+		//Backward compatibility. Previous version of StateTransition did not have source state
+		if (!stateTransitions.isEmpty() && stateTransitions.iterator().next().getSourceState() == null) {
+			final List<StateTransition<T>> tmp = new ArrayList<>();
+			tmp.addAll(this.stateTransitions);
+
+			this.stateTransitions.clear();
+			for (StateTransition<T> transition : tmp) {
+				addStateTransition(transition.getAction(), transition.getTargetState(), transition.getCondition());
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b476c49..8bd8612 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,10 +18,15 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
 import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.cep.pattern.FollowedByPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
@@ -31,12 +36,15 @@ import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -363,6 +371,114 @@ public class NFACompiler {
 	}
 
 	/**
+	 * Used for migrating CEP graphs prior to 1.3. It removes the dummy start, adds the dummy end, and translates all
+	 * states to consuming ones by moving all TAKEs and IGNOREs to the next state. This method assumes each state
+	 * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed!
+	 *
+	 * @param oldStartState dummy start state of old graph
+	 * @param <T> type of events
+	 * @return map of new states, where key is the name of a state and value is the state itself
+	 */
+	@Internal
+	public static <T> Map<String, State<T>> migrateGraph(State<T> oldStartState) {
+		State<T> oldFirst = oldStartState;
+		State<T> oldSecond = oldStartState.getStateTransitions().iterator().next().getTargetState();
+
+		StateTransition<T> oldFirstToSecondTake = Iterators.find(
+			oldFirst.getStateTransitions().iterator(),
+			new Predicate<StateTransition<T>>() {
+				@Override
+				public boolean apply(@Nullable StateTransition<T> input) {
+					return input != null && input.getAction() == StateTransitionAction.TAKE;
+				}
+
+			});
+
+		StateTransition<T> oldFirstIgnore = Iterators.find(
+			oldFirst.getStateTransitions().iterator(),
+			new Predicate<StateTransition<T>>() {
+				@Override
+				public boolean apply(@Nullable StateTransition<T> input) {
+					return input != null && input.getAction() == StateTransitionAction.IGNORE;
+				}
+
+			}, null);
+
+		StateTransition<T> oldSecondToThirdTake = Iterators.find(
+			oldSecond.getStateTransitions().iterator(),
+			new Predicate<StateTransition<T>>() {
+				@Override
+				public boolean apply(@Nullable StateTransition<T> input) {
+					return input != null && input.getAction() == StateTransitionAction.TAKE;
+				}
+
+			}, null);
+
+		final Map<String, State<T>> convertedStates = new HashMap<>();
+		State<T> newSecond;
+		State<T> newFirst = new State<>(oldSecond.getName(), State.StateType.Start);
+		convertedStates.put(newFirst.getName(), newFirst);
+		while (oldSecondToThirdTake != null) {
+
+			newSecond = new State<T>(oldSecondToThirdTake.getTargetState().getName(), State.StateType.Normal);
+			convertedStates.put(newSecond.getName(), newSecond);
+			newFirst.addTake(newSecond, oldFirstToSecondTake.getCondition());
+
+			if (oldFirstIgnore != null) {
+				newFirst.addIgnore(oldFirstIgnore.getCondition());
+			}
+
+			oldFirst = oldSecond;
+
+			oldFirstToSecondTake = Iterators.find(
+				oldFirst.getStateTransitions().iterator(),
+				new Predicate<StateTransition<T>>() {
+					@Override
+					public boolean apply(@Nullable StateTransition<T> input) {
+						return input != null && input.getAction() == StateTransitionAction.TAKE;
+					}
+
+				});
+
+			oldFirstIgnore = Iterators.find(
+				oldFirst.getStateTransitions().iterator(),
+				new Predicate<StateTransition<T>>() {
+					@Override
+					public boolean apply(@Nullable StateTransition<T> input) {
+						return input != null && input.getAction() == StateTransitionAction.IGNORE;
+					}
+
+				}, null);
+
+			oldSecond = oldSecondToThirdTake.getTargetState();
+
+			oldSecondToThirdTake = Iterators.find(
+				oldSecond.getStateTransitions().iterator(),
+				new Predicate<StateTransition<T>>() {
+					@Override
+					public boolean apply(@Nullable StateTransition<T> input) {
+						return input != null && input.getAction() == StateTransitionAction.TAKE;
+					}
+
+				}, null);
+
+			newFirst = newSecond;
+		}
+
+		final State<T> endingState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
+
+		newFirst.addTake(endingState, oldFirstToSecondTake.getCondition());
+
+		if (oldFirstIgnore != null) {
+			newFirst.addIgnore(oldFirstIgnore.getCondition());
+		}
+
+		convertedStates.put(endingState.getName(), endingState);
+
+		return convertedStates;
+	}
+
+	/**
 	 * Factory interface for {@link NFA}.
 	 *
 	 * @param <T> Type of the input events which are processed by the NFA

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 825ba957..5b05f19 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class NFAITCase extends TestLogger {
 
@@ -421,7 +422,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testComplexBranchingAfterKleeneStar() {
+	public void testComplexBranchingAfterZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -519,7 +520,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testKleeneStar() {
+	public void testZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -581,7 +582,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testEagerKleeneStar() {
+	public void testEagerZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -646,7 +647,7 @@ public class NFAITCase extends TestLogger {
 
 
 	@Test
-	public void testBeginWithKleeneStar() {
+	public void testBeginWithZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event middleEvent1 = new Event(40, "a", 2.0);
@@ -704,7 +705,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testKleeneStarAfterKleeneStar() {
+	public void testZeroOrMoreAfterZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -779,7 +780,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testKleeneStarAfterBranching() {
+	public void testZeroOrMoreAfterBranching() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -865,7 +866,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testStrictContinuityNoResultsAfterKleeneStar() {
+	public void testStrictContinuityNoResultsAfterZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event start = new Event(40, "d", 2.0);
@@ -923,7 +924,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testStrictContinuityResultsAfterKleeneStar() {
+	public void testStrictContinuityResultsAfterZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event start = new Event(40, "d", 2.0);
@@ -1663,4 +1664,181 @@ public class NFAITCase extends TestLogger {
 		), resultingPatterns);
 	}
 
+	/**
+	 * Clearing SharedBuffer
+	 */
+
+	@Test
+	public void testTimesClearingBuffer() {
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).within(Time.milliseconds(8));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		nfa.process(startEvent, 1);
+		nfa.process(middleEvent1, 2);
+		nfa.process(middleEvent2, 3);
+		nfa.process(middleEvent3, 4);
+		nfa.process(end1, 6);
+
+		//pruning element
+		nfa.process(null, 10);
+
+		assertEquals(true, nfa.isEmpty());
+	}
+
+	@Test
+	public void testOptionalClearingBuffer() {
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).within(Time.milliseconds(8));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		nfa.process(startEvent, 1);
+		nfa.process(middleEvent, 5);
+		nfa.process(end1, 6);
+
+		//pruning element
+		nfa.process(null, 10);
+
+		assertEquals(true, nfa.isEmpty());
+	}
+
+	@Test
+	public void testAtLeastOneClearingBuffer() {
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).within(Time.milliseconds(8));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		nfa.process(startEvent, 1);
+		nfa.process(middleEvent1, 3);
+		nfa.process(middleEvent2, 4);
+		nfa.process(end1, 6);
+
+		//pruning element
+		nfa.process(null, 10);
+
+		assertEquals(true, nfa.isEmpty());
+	}
+
+
+	@Test
+	public void testZeroOrMoreClearingBuffer() {
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).within(Time.milliseconds(8));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		nfa.process(startEvent, 1);
+		nfa.process(middleEvent1, 3);
+		nfa.process(middleEvent2, 4);
+		nfa.process(end1, 6);
+
+		//pruning element
+		nfa.process(null, 10);
+
+		assertEquals(true, nfa.isEmpty());
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
new file mode 100644
index 0000000..65fa733
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration12to13Test {
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = CEPMigration12to13Test.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		if (resource == null) {
+			throw new NullPointerException("Missing snapshot resource.");
+		}
+		return resource.getFile();
+	}
+
+	@Test
+	public void testMigrationAfterBranchingPattern() throws Exception {
+
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		final Event startEvent = new Event(42, "start", 1.0);
+		final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+		final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+		final Event endEvent = new Event(42, "end", 1.0);
+
+		// uncomment these lines for regenerating the snapshot on Flink 1.2
+//		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+//			new KeyedOneInputStreamOperatorTestHarness<>(
+//				new KeyedCEPPatternOperator<>(
+//					Event.createTypeSerializer(),
+//					false,
+//					keySelector,
+//					IntSerializer.INSTANCE,
+//					new NFAFactory(),
+//					true),
+//				keySelector,
+//				BasicTypeInfo.INT_TYPE_INFO);
+//
+//		harness.setup();
+//		harness.open();
+//		harness.processElement(new StreamRecord<Event>(startEvent, 1));
+//		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+//		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+//		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+//		harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+//		harness.processWatermark(new Watermark(5));
+//		// simulate snapshot/restore with empty element queue but NFA state
+//		OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//		FileOutputStream out = new FileOutputStream(
+//				"src/test/resources/cep-branching-snapshot-1.2");
+//		ObjectOutputStream oos = new ObjectOutputStream(out);
+//		oos.writeObject(snapshot.getOperatorChainIndex());
+//		oos.writeObject(snapshot.getLegacyOperatorState());
+//		oos.writeObject(snapshot.getManagedKeyedState());
+//		oos.writeObject(snapshot.getRawKeyedState());
+//		oos.writeObject(snapshot.getManagedOperatorState());
+//		oos.writeObject(snapshot.getRawOperatorState());
+//		out.close();
+//		harness.close();
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					keySelector,
+					IntSerializer.INSTANCE,
+					new NFAFactory(),
+					true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+			"cep-branching-snapshot-1.2")));
+		final OperatorStateHandles snapshot = new OperatorStateHandles(
+			(int) ois.readObject(),
+			(StreamStateHandle) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject()
+		);
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+		harness.processElement(new StreamRecord<>(endEvent, 5));
+
+		harness.processWatermark(new Watermark(20));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and 2 results
+		assertEquals(3, result.size());
+
+		Object resultObject1 = result.poll();
+		assertTrue(resultObject1 instanceof StreamRecord);
+		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+		assertTrue(resultRecord1.getValue() instanceof Map);
+
+		Object resultObject2 = result.poll();
+		assertTrue(resultObject2 instanceof StreamRecord);
+		StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+		assertTrue(resultRecord2.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+		assertEquals(startEvent, patternMap1.get("start"));
+		assertEquals(middleEvent1, patternMap1.get("middle"));
+		assertEquals(endEvent, patternMap1.get("end"));
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+		assertEquals(startEvent, patternMap2.get("start"));
+		assertEquals(middleEvent2, patternMap2.get("middle"));
+		assertEquals(endEvent, patternMap2.get("end"));
+
+		harness.close();
+	}
+
+	@Test
+	public void testStartingNewPatternAfterMigration() throws Exception {
+
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		final Event startEvent1 = new Event(42, "start", 1.0);
+		final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+		final Event startEvent2 = new Event(42, "start", 5.0);
+		final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+		final Event endEvent = new Event(42, "end", 1.0);
+
+		// uncomment these lines for regenerating the snapshot on Flink 1.2
+//		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+//			new KeyedOneInputStreamOperatorTestHarness<>(
+//				new KeyedCEPPatternOperator<>(
+//					Event.createTypeSerializer(),
+//					false,
+//					keySelector,
+//					IntSerializer.INSTANCE,
+//					new NFAFactory(),
+//					true),
+//				keySelector,
+//				BasicTypeInfo.INT_TYPE_INFO);
+//
+//		harness.setup();
+//		harness.open();
+//		harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+//		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+//		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+//		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+//		harness.processWatermark(new Watermark(5));
+//		// simulate snapshot/restore with empty element queue but NFA state
+//		OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//		FileOutputStream out = new FileOutputStream(
+//				"src/test/resources/cep-starting-snapshot-1.2");
+//		ObjectOutputStream oos = new ObjectOutputStream(out);
+//		oos.writeObject(snapshot.getOperatorChainIndex());
+//		oos.writeObject(snapshot.getLegacyOperatorState());
+//		oos.writeObject(snapshot.getManagedKeyedState());
+//		oos.writeObject(snapshot.getRawKeyedState());
+//		oos.writeObject(snapshot.getManagedOperatorState());
+//		oos.writeObject(snapshot.getRawOperatorState());
+//		out.close();
+//		harness.close();
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					keySelector,
+					IntSerializer.INSTANCE,
+					new NFAFactory(),
+					true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+			"cep-starting-snapshot-1.2")));
+		final OperatorStateHandles snapshot = new OperatorStateHandles(
+			(int) ois.readObject(),
+			(StreamStateHandle) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject()
+		);
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(startEvent2, 5));
+		harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
+		harness.processElement(new StreamRecord<>(endEvent, 7));
+
+		harness.processWatermark(new Watermark(20));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and 3 results
+		assertEquals(4, result.size());
+
+		Object resultObject1 = result.poll();
+		assertTrue(resultObject1 instanceof StreamRecord);
+		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+		assertTrue(resultRecord1.getValue() instanceof Map);
+
+		Object resultObject2 = result.poll();
+		assertTrue(resultObject2 instanceof StreamRecord);
+		StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+		assertTrue(resultRecord2.getValue() instanceof Map);
+
+		Object resultObject3 = result.poll();
+		assertTrue(resultObject3 instanceof StreamRecord);
+		StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+		assertTrue(resultRecord3.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+		assertEquals(startEvent1, patternMap1.get("start"));
+		assertEquals(middleEvent1, patternMap1.get("middle"));
+		assertEquals(endEvent, patternMap1.get("end"));
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+		assertEquals(startEvent1, patternMap2.get("start"));
+		assertEquals(middleEvent2, patternMap2.get("middle"));
+		assertEquals(endEvent, patternMap2.get("end"));
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap3 = (Map<String, Event>) resultRecord3.getValue();
+
+		assertEquals(startEvent2, patternMap3.get("start"));
+		assertEquals(middleEvent2, patternMap3.get("middle"));
+		assertEquals(endEvent, patternMap3.get("end"));
+
+		harness.close();
+	}
+
+	@Test
+	public void testSinglePatternAfterMigration() throws Exception {
+
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		final Event startEvent1 = new Event(42, "start", 1.0);
+
+		// uncomment these lines for regenerating the snapshot on Flink 1.2
+//		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+//			new KeyedOneInputStreamOperatorTestHarness<>(
+//				new KeyedCEPPatternOperator<>(
+//					Event.createTypeSerializer(),
+//					false,
+//					keySelector,
+//					IntSerializer.INSTANCE,
+//					new SinglePatternNFAFactory(),
+//					true),
+//				keySelector,
+//				BasicTypeInfo.INT_TYPE_INFO);
+//
+//		harness.setup();
+//		harness.open();
+//		harness.processWatermark(new Watermark(5));
+//		// simulate snapshot/restore with empty element queue but NFA state
+//		OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//		FileOutputStream out = new FileOutputStream(
+//				"src/test/resources/cep-single-pattern-snapshot-1.2");
+//		ObjectOutputStream oos = new ObjectOutputStream(out);
+//		oos.writeObject(snapshot.getOperatorChainIndex());
+//		oos.writeObject(snapshot.getLegacyOperatorState());
+//		oos.writeObject(snapshot.getManagedKeyedState());
+//		oos.writeObject(snapshot.getRawKeyedState());
+//		oos.writeObject(snapshot.getManagedOperatorState());
+//		oos.writeObject(snapshot.getRawOperatorState());
+//		out.close();
+//		harness.close();
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					keySelector,
+					IntSerializer.INSTANCE,
+					new SinglePatternNFAFactory(),
+					true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+			"cep-single-pattern-snapshot-1.2")));
+		final OperatorStateHandles snapshot = new OperatorStateHandles(
+			(int) ois.readObject(),
+			(StreamStateHandle) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject()
+		);
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(startEvent1, 5));
+
+		harness.processWatermark(new Watermark(20));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject = result.poll();
+		assertTrue(resultObject instanceof StreamRecord);
+		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+		assertTrue(resultRecord.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+		assertEquals(startEvent1, patternMap.get("start"));
+
+		harness.close();
+	}
+
+	private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		private final boolean handleTimeout;
+
+		private SinglePatternNFAFactory() {
+			this(false);
+		}
+
+		private SinglePatternNFAFactory(boolean handleTimeout) {
+			this.handleTimeout = handleTimeout;
+		}
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+				.within(Time.milliseconds(10L));
+
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+		}
+	}
+
+	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		private final boolean handleTimeout;
+
+		private NFAFactory() {
+			this(false);
+		}
+
+		private NFAFactory(boolean handleTimeout) {
+			this.handleTimeout = handleTimeout;
+		}
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+				.followedBy("middle")
+				.subtype(SubEvent.class)
+				.where(new MiddleFilter())
+				.followedBy("end")
+				.where(new EndFilter())
+				// add a window timeout to test whether timestamps of elements in the
+				// priority queue in CEP operator are correctly checkpointed/restored
+				.within(Time.milliseconds(10L));
+
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+		}
+	}
+
+	private static class StartFilter implements FilterFunction<Event> {
+		private static final long serialVersionUID = 5726188262756267490L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("start");
+		}
+	}
+
+	private static class MiddleFilter implements FilterFunction<SubEvent> {
+		private static final long serialVersionUID = 6215754202506583964L;
+
+		@Override
+		public boolean filter(SubEvent value) throws Exception {
+			return value.getVolume() > 5.0;
+		}
+	}
+
+	private static class EndFilter implements FilterFunction<Event> {
+		private static final long serialVersionUID = 7056763917392056548L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("end");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
new file mode 100644
index 0000000..47f710e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
new file mode 100644
index 0000000..255f46a
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
new file mode 100644
index 0000000..c41f6c2
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 differ


[3/3] flink git commit: [FLINK-3318] Add support for quantifiers to CEP's pattern API

Posted by kk...@apache.org.
[FLINK-3318] Add support for quantifiers to CEP's pattern API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9001c4ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9001c4ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9001c4ef

Branch: refs/heads/master
Commit: 9001c4ef82a7a04d821252ac62bb7809a931c98a
Parents: d0695c0
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Sat Mar 18 20:53:00 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |   78 +-
 .../flink/cep/scala/pattern/Pattern.scala       |   81 +-
 .../apache/flink/cep/nfa/ComputationState.java  |   44 +-
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |   17 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  398 ++++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |   25 +-
 .../java/org/apache/flink/cep/nfa/State.java    |   32 +-
 .../apache/flink/cep/nfa/StateTransition.java   |   20 +-
 .../flink/cep/nfa/StateTransitionAction.java    |    4 +-
 .../nfa/compiler/MalformedPatternException.java |   32 -
 .../flink/cep/nfa/compiler/NFACompiler.java     |  317 ++++-
 .../flink/cep/pattern/FilterFunctions.java      |   44 +
 .../cep/pattern/MalformedPatternException.java  |   32 +
 .../flink/cep/pattern/NotFilterFunction.java    |   42 +
 .../org/apache/flink/cep/pattern/Pattern.java   |  115 ++
 .../apache/flink/cep/pattern/Quantifier.java    |   54 +
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 1338 +++++++++++++++++-
 .../java/org/apache/flink/cep/nfa/NFATest.java  |   90 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |  152 +-
 .../apache/flink/cep/pattern/PatternTest.java   |   54 +
 20 files changed, 2595 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 8047481..22cffbc 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -341,7 +341,45 @@ patternState.subtype(SubEvent.class);
 patternState.within(Time.seconds(10));
 {% endhighlight %}
           </td>
-      </tr>
+       </tr>
+       <tr>
+          <td><strong>ZeroOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
+              <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+      {% highlight java %}
+      patternState.zeroOrMore();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>OneOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
+              <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+      {% highlight java %}
+      patternState.oneOrMore();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Optional</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or once.</p>
+      {% highlight java %}
+      patternState.optional();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Times</strong></td>
+          <td>
+              <p>Specifies exact number of times that this pattern should be matched.</p>
+      {% highlight java %}
+      patternState.times(2);
+      {% endhighlight %}
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>
@@ -419,6 +457,44 @@ patternState.within(Time.seconds(10))
 {% endhighlight %}
           </td>
       </tr>
+       <tr>
+          <td><strong>ZeroOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
+              <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+      {% highlight scala %}
+      patternState.zeroOrMore()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>OneOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
+              <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+      {% highlight scala %}
+      patternState.oneOrMore()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Optional</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or once.</p>
+      {% highlight scala %}
+      patternState.optional()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Times</strong></td>
+          <td>
+              <p>Specifies exact number of times that this pattern should be matched.</p>
+      {% highlight scala %}
+      patternState.times(2)
+      {% endhighlight %}
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index cc3b03c..5baf780 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -19,7 +19,7 @@ package org.apache.flink.cep.scala.pattern
 
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.cep
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern}
 import org.apache.flink.streaming.api.windowing.time.Time
 
 /**
@@ -59,6 +59,12 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
 
   /**
     *
+    * @return currently applied quantifier to this pattern
+    */
+  def getQuantifier: Quantifier = jPattern.getQuantifier
+
+  /**
+    *
     * @return Filter condition for an event to be matched
     */
   def getFilterFunction(): Option[FilterFunction[F]] = {
@@ -160,6 +166,79 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     wrapPattern(jPattern.getPrevious())
   }
 
+  /**
+    * Specifies that this pattern can occur zero or more times(kleene star).
+    * This means any number of events can be matched in this state.
+    *
+    * @return The same pattern with applied Kleene star operator
+    */
+  def zeroOrMore: Pattern[T, F] = {
+    jPattern.zeroOrMore()
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur zero or more times(kleene star).
+    * This means any number of events can be matched in this state.
+    *
+    * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns:
+    * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+    *
+    * @param eager if true the pattern always consumes earlier events
+    * @return The same pattern with applied Kleene star operator
+    */
+  def zeroOrMore(eager: Boolean): Pattern[T, F] = {
+    jPattern.zeroOrMore(eager)
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur one or more times(kleene star).
+    * This means at least one and at most infinite number of events can be matched in this state.
+    *
+    * @return The same pattern with applied Kleene plus operator
+    */
+  def oneOrMore: Pattern[T, F] = {
+    jPattern.oneOrMore()
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur one or more times(kleene star).
+    * This means at least one and at most infinite number of events can be matched in this state.
+    *
+    * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns:
+    * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+    *
+    * @param eager if true the pattern always consumes earlier events
+    * @return The same pattern with applied Kleene plus operator
+    */
+  def oneOrMore(eager: Boolean): Pattern[T, F] = {
+    jPattern.oneOrMore(eager)
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur zero or once.
+    *
+    * @return The same pattern with applied Kleene ? operator
+    */
+  def optional: Pattern[T, F] = {
+    jPattern.optional()
+    this
+  }
+
+  /**
+    * Specifies exact number of times that this pattern should be matched.
+    *
+    * @param times number of times matching event must appear
+    * @return The same pattern with number of times applied
+    */
+  def times(times: Int): Pattern[T, F] = {
+    jPattern.times(times)
+    this
+  }
+
 }
 
 object Pattern {

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 3f44fba..445d038 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * Helper class which encapsulates the state of the NFA computation. It points to the current state,
  * the last taken event, its occurrence timestamp, the current version and the starting timestamp
@@ -41,17 +43,21 @@ public class ComputationState<T> {
 	// Timestamp of the first element in the pattern
 	private final long startTimestamp;
 
-	public ComputationState(
-		final State<T> currentState,
-		final T event,
-		final long timestamp,
-		final DeweyNumber version,
-		final long startTimestamp) {
+	private final State<T> previousState;
+
+	private ComputationState(
+			final State<T> currentState,
+			final State<T> previousState,
+			final T event,
+			final long timestamp,
+			final DeweyNumber version,
+			final long startTimestamp) {
 		this.state = currentState;
 		this.event = event;
 		this.timestamp = timestamp;
 		this.version = version;
 		this.startTimestamp = startTimestamp;
+		this.previousState = previousState;
 	}
 
 	public boolean isFinalState() {
@@ -59,7 +65,7 @@ public class ComputationState<T> {
 	}
 
 	public boolean isStartState() {
-		return state.isStart();
+		return state.isStart() && event == null;
 	}
 
 	public long getTimestamp() {
@@ -74,6 +80,10 @@ public class ComputationState<T> {
 		return state;
 	}
 
+	public State<T> getPreviousState() {
+		return previousState;
+	}
+
 	public T getEvent() {
 		return event;
 	}
@@ -81,4 +91,24 @@ public class ComputationState<T> {
 	public DeweyNumber getVersion() {
 		return version;
 	}
+
+	public static <T> ComputationState<T> createStartState(final State<T> state) {
+		Preconditions.checkArgument(state.isStart());
+		return new ComputationState<>(state, null, null, -1L, new DeweyNumber(1), -1L);
+	}
+
+	public static <T> ComputationState<T> createStartState(final State<T> state, final DeweyNumber version) {
+		Preconditions.checkArgument(state.isStart());
+		return new ComputationState<>(state, null, null, -1L, version, -1L);
+	}
+
+	public static <T> ComputationState<T> createState(
+			final State<T> currentState,
+			final State<T> previousState,
+			final T event,
+			final long timestamp,
+			final DeweyNumber version,
+			final long startTimestamp) {
+		return new ComputationState<>(currentState, previousState, event, timestamp, version, startTimestamp);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index bb9039d..fd3fafa 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -44,6 +44,10 @@ public class DeweyNumber implements Serializable {
 		this.deweyNumber = deweyNumber;
 	}
 
+	public DeweyNumber(DeweyNumber number) {
+		this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length);
+	}
+
 	/**
 	 * Checks whether this dewey number is compatible to the other dewey number.
 	 *
@@ -90,8 +94,19 @@ public class DeweyNumber implements Serializable {
 	 * @return A new dewey number derived from this whose last digit is increased by one
 	 */
 	public DeweyNumber increase() {
+		return increase(1);
+	}
+
+	/**
+	 * Creates a new dewey number from this such that its last digit is increased by the supplied
+	 * number
+	 *
+	 * @param times how many times to increase the Dewey number
+	 * @return A new dewey number derived from this whose last digit is increased by given number
+	 */
+	public DeweyNumber increase(int times) {
 		int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length);
-		newDeweyNumber[deweyNumber.length - 1]++;
+		newDeweyNumber[deweyNumber.length - 1] += times;
 
 		return new DeweyNumber(newDeweyNumber);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 257418a..3d42248 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -39,7 +40,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -87,7 +87,7 @@ public class NFA<T> implements Serializable {
 	/**
 	 * 	Buffer used to store the matched events.
 	 */
-	private final SharedBuffer<State<T>, T> sharedBuffer;
+	private final SharedBuffer<String, T> sharedBuffer;
 
 	/**
 	 * A set of all the valid NFA states, as returned by the
@@ -98,7 +98,7 @@ public class NFA<T> implements Serializable {
 
 	/**
 	 * The length of a windowed pattern, as specified using the
-	 * {@link org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)}
+	 * {@link org.apache.flink.cep.pattern.Pattern#within(Time)}  Pattern.within(Time)}
 	 * method.
 	 */
 	private final long windowTime;
@@ -109,9 +109,6 @@ public class NFA<T> implements Serializable {
 	 */
 	private final boolean handleTimeout;
 
-	// Current starting index for the next dewey version number
-	private int startEventCounter;
-
 	/**
 	 * Current set of {@link ComputationState computation states} within the state machine.
 	 * These are the "active" intermediate states that are waiting for new matching
@@ -119,8 +116,6 @@ public class NFA<T> implements Serializable {
 	 */
 	private transient Queue<ComputationState<T>> computationStates;
 
-	private StateTransitionComparator<T>  stateTransitionComparator;
-
 	public NFA(
 			final TypeSerializer<T> eventSerializer,
 			final long windowTime,
@@ -129,11 +124,10 @@ public class NFA<T> implements Serializable {
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
 		this.windowTime = windowTime;
 		this.handleTimeout = handleTimeout;
-		this.sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
-		this.computationStates = new LinkedList<>();
-		this.states = new HashSet<>();
-		this.startEventCounter = 1;
-		this.stateTransitionComparator =  new StateTransitionComparator<>();
+		sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+		computationStates = new LinkedList<>();
+
+		states = new HashSet<>();
 	}
 
 	public Set<State<T>> getStates() {
@@ -150,7 +144,7 @@ public class NFA<T> implements Serializable {
 		states.add(state);
 
 		if (state.isStart()) {
-			computationStates.add(new ComputationState<>(state, null, -1L, null, -1L));
+			computationStates.add(ComputationState.createStartState(state));
 		}
 	}
 
@@ -201,8 +195,8 @@ public class NFA<T> implements Serializable {
 				}
 
 				// remove computation state which has exceeded the window length
-				sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
-				sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
+				sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
+				sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
 
 				newComputationStates = Collections.emptyList();
 			} else if (event != null) {
@@ -218,8 +212,8 @@ public class NFA<T> implements Serializable {
 					result.addAll(matches);
 
 					// remove found patterns because they are no longer needed
-					sharedBuffer.release(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp());
-					sharedBuffer.remove(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
 				} else {
 					// add new computation state; it will be processed once the next event arrives
 					computationStates.add(newComputationState);
@@ -252,8 +246,7 @@ public class NFA<T> implements Serializable {
 			return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
 				sharedBuffer.equals(other.sharedBuffer) &&
 				states.equals(other.states) &&
-				windowTime == other.windowTime &&
-				startEventCounter == other.startEventCounter;
+				windowTime == other.windowTime;
 		} else {
 			return false;
 		}
@@ -261,12 +254,80 @@ public class NFA<T> implements Serializable {
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime, startEventCounter);
+		return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime);
+	}
+
+	private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
+		return s1.getName().equals(s2.getName());
 	}
 
 	/**
+	 * Class for storing resolved transitions. It counts at insert time the number of
+	 * branching transitions both for IGNORE and TAKE actions.
+ 	 */
+	private static class OutgoingEdges<T> {
+		private List<StateTransition<T>> edges = new ArrayList<>();
+
+		private final State<T> currentState;
+
+		private int totalTakeBranches = 0;
+		private int totalIgnoreBranches = 0;
+
+		OutgoingEdges(final State<T> currentState) {
+			this.currentState = currentState;
+		}
+
+		void add(StateTransition<T> edge) {
+
+			if (!isSelfIgnore(edge)) {
+				if (edge.getAction() == StateTransitionAction.IGNORE) {
+					totalIgnoreBranches++;
+				} else if (edge.getAction() == StateTransitionAction.TAKE) {
+					totalTakeBranches++;
+				}
+			}
+
+			edges.add(edge);
+		}
+
+		int getTotalIgnoreBranches() {
+			return totalIgnoreBranches;
+		}
+		int getTotalTakeBranches() {
+			return totalTakeBranches;
+		}
+
+		List<StateTransition<T>> getEdges() {
+			return edges;
+		}
+
+		private boolean isSelfIgnore(final StateTransition<T> edge) {
+			return isEquivalentState(edge.getTargetState(), currentState) &&
+				edge.getAction() == StateTransitionAction.IGNORE;
+		}
+	}
+
+
+	/**
 	 * Computes the next computation states based on the given computation state, the current event,
-	 * its timestamp and the internal state machine.
+	 * its timestamp and the internal state machine. The algorithm is:
+	 *
+	 * 1. Decide on valid transitions and number of branching paths. See {@link OutgoingEdges}
+	 * 2. Perform transitions:
+	 *      a) IGNORE (links in {@link SharedBuffer} will still point to the previous event)
+	 *          - do not perform for Start State - special case
+	 *          - if stays in the same state increase the current stage for future use with number of
+	 *            outgoing edges
+	 *          - if after PROCEED increase current stage and add new stage (as we change the state)
+	 *          - lock the entry in {@link SharedBuffer} as it is needed in the created branch
+	 *      b) TAKE (links in {@link SharedBuffer} will point to the current event)
+	 *          - add entry to the shared buffer with version of the current computation state
+	 *          - add stage and then increase with number of takes for the future computation states
+	 *          - peek to the next state if it has PROCEED path to a Final State, if true create
+	 *            Final ComputationState to emit results
+	 * 3. Handle the Start State, as it always have to remain
+	 * 4. Release the corresponding entries in {@link SharedBuffer}.
+	 *
 	 *
 	 * @param computationState Current computation state
 	 * @param event Current event which is processed
@@ -277,31 +338,179 @@ public class NFA<T> implements Serializable {
 			final ComputationState<T> computationState,
 			final T event,
 			final long timestamp) {
-		Stack<State<T>> states = new Stack<>();
-		List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
-		State<T> state = computationState.getState();
 
-		states.push(state);
+		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState, event);
+
+		// Create the computing version based on the previously computed edges
+		// We need to defer the creation of computation states until we know how many edges start
+		// at this computation state so that we can assign proper version
+		final List<StateTransition<T>> edges = outgoingEdges.getEdges();
+		int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1);
+		int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches();
+
+		final List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
+		for (StateTransition<T> edge : edges) {
+			switch (edge.getAction()) {
+				case IGNORE: {
+					if (!computationState.isStartState()) {
+						final DeweyNumber version;
+						if (isEquivalentState(edge.getTargetState(), computationState.getState())) {
+							//Stay in the same state (it can be either looping one or singleton)
+							final int toIncrease = calculateIncreasingSelfState(
+								outgoingEdges.getTotalIgnoreBranches(),
+								outgoingEdges.getTotalTakeBranches());
+							version = computationState.getVersion().increase(toIncrease);
+						} else {
+							//IGNORE after PROCEED
+							version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+							ignoreBranchesToVisit--;
+						}
 
-		boolean branched = false;
-		while (!states.isEmpty()) {
-			State<T> currentState = states.pop();
-			final List<StateTransition<T>> stateTransitions = new ArrayList<>(currentState.getStateTransitions());
+						resultingComputationStates.add(
+							ComputationState.createState(
+								edge.getTargetState(),
+								computationState.getPreviousState(),
+								computationState.getEvent(),
+								computationState.getTimestamp(),
+								version,
+								computationState.getStartTimestamp()
+							)
+						);
+						sharedBuffer.lock(
+							computationState.getPreviousState().getName(),
+							computationState.getEvent(),
+							computationState.getTimestamp());
+					}
+				}
+				break;
+				case TAKE:
+					final State<T> newState = edge.getTargetState();
+					final State<T> consumingState = edge.getSourceState();
+					final State<T> previousEventState = computationState.getPreviousState();
+
+					final T previousEvent = computationState.getEvent();
+					final DeweyNumber currentVersion = computationState.getVersion();
+
+					final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+					takeBranchesToVisit--;
+
+					final long startTimestamp;
+					if (computationState.isStartState()) {
+						startTimestamp = timestamp;
+						sharedBuffer.put(
+							consumingState.getName(),
+							event,
+							timestamp,
+							currentVersion);
+					} else {
+						startTimestamp = computationState.getStartTimestamp();
+						sharedBuffer.put(
+							consumingState.getName(),
+							event,
+							timestamp,
+							previousEventState.getName(),
+							previousEvent,
+							computationState.getTimestamp(),
+							currentVersion);
+					}
 
-			// this is for when we restore from legacy. In that case, the comparator is null
-			// as it did not exist in the previous Flink versions, so we have to initialize it here.
+					// a new computation state is referring to the shared entry
+					sharedBuffer.lock(consumingState.getName(), event, timestamp);
+
+					resultingComputationStates.add(ComputationState.createState(
+						newState,
+						consumingState,
+						event,
+						timestamp,
+						newComputationStateVersion,
+						startTimestamp
+					));
+
+					//check if newly created state is optional (have a PROCEED path to Final state)
+					final State<T> finalState = findFinalStateAfterProceed(newState, event);
+					if (finalState != null) {
+						sharedBuffer.lock(consumingState.getName(), event, timestamp);
+						resultingComputationStates.add(ComputationState.createState(
+							finalState,
+							consumingState,
+							event,
+							timestamp,
+							newComputationStateVersion,
+							startTimestamp));
+					}
+					break;
+			}
+		}
 
-			if (stateTransitionComparator == null) {
-				stateTransitionComparator = new StateTransitionComparator();
+		if (computationState.isStartState()) {
+			final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), outgoingEdges.getTotalTakeBranches());
+			final ComputationState<T> startState = createStartState(computationState, totalBranches);
+			resultingComputationStates.add(startState);
+		}
+
+		if (computationState.getEvent() != null) {
+			// release the shared entry referenced by the current computation state.
+			sharedBuffer.release(
+				computationState.getPreviousState().getName(),
+				computationState.getEvent(),
+				computationState.getTimestamp());
+			// try to remove unnecessary shared buffer entries
+			sharedBuffer.remove(
+				computationState.getPreviousState().getName(),
+				computationState.getEvent(),
+				computationState.getTimestamp());
+		}
+
+		return resultingComputationStates;
+	}
+
+	private State<T> findFinalStateAfterProceed(State<T> state, T event) {
+		final Stack<State<T>> statesToCheck = new Stack<>();
+		statesToCheck.push(state);
+
+		try {
+			while (!statesToCheck.isEmpty()) {
+				final State<T> currentState = statesToCheck.pop();
+				for (StateTransition<T> transition : currentState.getStateTransitions()) {
+					if (transition.getAction() == StateTransitionAction.PROCEED &&
+						checkFilterCondition(transition.getCondition(), event)) {
+						if (transition.getTargetState().isFinal()) {
+							return transition.getTargetState();
+						} else {
+							statesToCheck.push(transition.getTargetState());
+						}
+					}
+				}
 			}
+		} catch (Exception e) {
+			throw new RuntimeException("Failure happened in filter function.", e);
+		}
+
+		return null;
+	}
+
+	private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
+		return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1;
+	}
+
+	private ComputationState<T> createStartState(final ComputationState<T> computationState, final int totalBranches) {
+		final DeweyNumber startVersion = computationState.getVersion().increase(totalBranches);
+		return ComputationState.createStartState(computationState.getState(), startVersion);
+	}
 
-			// impose the IGNORE will be processed last
-			Collections.sort(stateTransitions, stateTransitionComparator);
+	private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) {
+		final Stack<State<T>> states = new Stack<>();
+		states.push(computationState.getState());
+		final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState());
+		//First create all outgoing edges, so to be able to reason about the Dewey version
+		while (!states.isEmpty()) {
+			State<T> currentState = states.pop();
+			Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions();
 
 			// check all state transitions for each state
-			for (StateTransition<T> stateTransition: stateTransitions) {
+			for (StateTransition<T> stateTransition : stateTransitions) {
 				try {
-					if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) {
+					if (checkFilterCondition(stateTransition.getCondition(), event)) {
 						// filter condition is true
 						switch (stateTransition.getAction()) {
 							case PROCEED:
@@ -310,73 +519,8 @@ public class NFA<T> implements Serializable {
 								states.push(stateTransition.getTargetState());
 								break;
 							case IGNORE:
-								final DeweyNumber version;
-								if (branched) {
-									version = computationState.getVersion().increase();
-								} else {
-									version = computationState.getVersion();
-								}
-								resultingComputationStates.add(new ComputationState<T>(
-									computationState.getState(),
-									computationState.getEvent(),
-									computationState.getTimestamp(),
-									version,
-									computationState.getStartTimestamp()));
-
-								// we have a new computation state referring to the same the shared entry
-								// the lock of the current computation is released later on
-								sharedBuffer.lock(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
-								break;
 							case TAKE:
-								final State<T> newState = stateTransition.getTargetState();
-								final DeweyNumber oldVersion;
-								final DeweyNumber newComputationStateVersion;
-								final State<T> previousState = computationState.getState();
-								final T previousEvent = computationState.getEvent();
-								final long previousTimestamp;
-								final long startTimestamp;
-
-								if (computationState.isStartState()) {
-									oldVersion = new DeweyNumber(startEventCounter++);
-									newComputationStateVersion = oldVersion.addStage();
-									startTimestamp = timestamp;
-									previousTimestamp = -1L;
-
-								} else {
-									startTimestamp = computationState.getStartTimestamp();
-									previousTimestamp = computationState.getTimestamp();
-									oldVersion = computationState.getVersion();
-
-									branched = true;
-									newComputationStateVersion = oldVersion.addStage();
-								}
-
-								if (previousState.isStart()) {
-									sharedBuffer.put(
-										newState,
-										event,
-										timestamp,
-										oldVersion);
-								} else {
-									sharedBuffer.put(
-										newState,
-										event,
-										timestamp,
-										previousState,
-										previousEvent,
-										previousTimestamp,
-										oldVersion);
-								}
-
-								// a new computation state is referring to the shared entry
-								sharedBuffer.lock(newState, event, timestamp);
-
-								resultingComputationStates.add(new ComputationState<T>(
-									newState,
-									event,
-									timestamp,
-									newComputationStateVersion,
-									startTimestamp));
+								outgoingEdges.add(stateTransition);
 								break;
 						}
 					}
@@ -385,19 +529,12 @@ public class NFA<T> implements Serializable {
 				}
 			}
 		}
+		return outgoingEdges;
+	}
 
-		if (computationState.isStartState()) {
-			// a computation state is always kept if it refers to a starting state because every
-			// new element can start a new pattern
-			resultingComputationStates.add(computationState);
-		} else {
-			// release the shared entry referenced by the current computation state.
-			sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
-			// try to remove unnecessary shared buffer entries
-			sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
-		}
 
-		return resultingComputationStates;
+	private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception {
+		return condition == null || condition.filter(event);
 	}
 
 	/**
@@ -409,8 +546,8 @@ public class NFA<T> implements Serializable {
 	 * @return Collection of event sequences which end in the given computation state
 	 */
 	private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) {
-		Collection<LinkedHashMultimap<State<T>, T>> paths = sharedBuffer.extractPatterns(
-			computationState.getState(),
+		Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns(
+			computationState.getPreviousState().getName(),
 			computationState.getEvent(),
 			computationState.getTimestamp(),
 			computationState.getVersion());
@@ -420,19 +557,20 @@ public class NFA<T> implements Serializable {
 		TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();
 
 		// generate the correct names from the collection of LinkedHashMultimaps
-		for (LinkedHashMultimap<State<T>, T> path: paths) {
+		for (LinkedHashMultimap<String, T> path: paths) {
 			Map<String, T> resultPath = new HashMap<>();
-			for (State<T> key: path.keySet()) {
+			for (String key: path.keySet()) {
 				int counter = 0;
 				Set<T> events = path.get(key);
 
 				// we iterate over the elements in insertion order
 				for (T event: events) {
 					resultPath.put(
-						events.size() > 1 ? generateStateName(key.getName(), counter): key.getName(),
+						events.size() > 1 ? generateStateName(key, counter): key,
 						// copy the element so that the user can change it
 						serializer.isImmutableType() ? event : serializer.copy(event)
 					);
+					counter++;
 				}
 			}
 
@@ -472,6 +610,7 @@ public class NFA<T> implements Serializable {
 
 	private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
 		oos.writeObject(computationState.getState());
+		oos.writeObject(computationState.getPreviousState());
 		oos.writeLong(computationState.getTimestamp());
 		oos.writeObject(computationState.getVersion());
 		oos.writeLong(computationState.getStartTimestamp());
@@ -490,6 +629,7 @@ public class NFA<T> implements Serializable {
 	@SuppressWarnings("unchecked")
 	private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		final State<T> state = (State<T>)ois.readObject();
+		final State<T> previousState = (State<T>)ois.readObject();
 		final long timestamp = ois.readLong();
 		final DeweyNumber version = (DeweyNumber)ois.readObject();
 		final long startTimestamp = ois.readLong();
@@ -504,7 +644,7 @@ public class NFA<T> implements Serializable {
 			event = null;
 		}
 
-		return new ComputationState<>(state, event, timestamp, version, startTimestamp);
+		return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
 	}
 
 	/**
@@ -629,20 +769,4 @@ public class NFA<T> implements Serializable {
 			return getClass().hashCode();
 		}
 	}
-
-	/**
-	 * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state.
-	 */
-	private static final class StateTransitionComparator<T> implements Serializable, Comparator<StateTransition<T>> {
-
-		private static final long serialVersionUID = -2775474935413622278L;
-
-		@Override
-		public int compare(final StateTransition<T> o1, final StateTransition<T> o2) {
-			if (o1.getAction() == o2.getAction()) {
-				return 0;
-			}
-			return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index b7e288b..e6a8c75 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -212,28 +212,27 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
 
 		if (entry != null) {
-			extractionStates.add(new ExtractionState<K, V>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
+			extractionStates.add(new ExtractionState<>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
 
 			// use a depth first search to reconstruct the previous relations
 			while (!extractionStates.isEmpty()) {
-				ExtractionState<K, V> extractionState = extractionStates.pop();
-				DeweyNumber currentVersion = extractionState.getVersion();
+				final ExtractionState<K, V> extractionState = extractionStates.pop();
 				// current path of the depth first search
-				Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+				final Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+				final SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
 
 				// termination criterion
-				if (currentVersion.length() == 1) {
-					LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
+				if (currentEntry == null) {
+					final LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
 
 					while(!currentPath.isEmpty()) {
-						SharedBufferEntry<K, V> currentEntry = currentPath.pop();
+						final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
 
-						completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
+						completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue());
 					}
 
 					result.add(completePath);
 				} else {
-					SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
 
 					// append state to the path
 					currentPath.push(currentEntry);
@@ -242,17 +241,18 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 					for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
 						// we can only proceed if the current version is compatible to the version
 						// of this previous relation
+						final DeweyNumber currentVersion = extractionState.getVersion();
 						if (currentVersion.isCompatibleWith(edge.getVersion())) {
 							if (firstMatch) {
 								// for the first match we don't have to copy the current path
-								extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), currentPath));
+								extractionStates.push(new ExtractionState<>(edge.getTarget(), edge.getVersion(), currentPath));
 								firstMatch = false;
 							} else {
-								Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
+								final Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
 								copy.addAll(currentPath);
 
 								extractionStates.push(
-									new ExtractionState<K, V>(
+									new ExtractionState<>(
 										edge.getTarget(),
 										edge.getVersion(),
 										copy));
@@ -260,6 +260,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 						}
 					}
 				}
+
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 50b2cf3..7bcb6ea 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,7 +45,7 @@ public class State<T> implements Serializable {
 		this.name = name;
 		this.stateType = stateType;
 
-		stateTransitions = new ArrayList<StateTransition<T>>();
+		stateTransitions = new ArrayList<>();
 	}
 
 	public boolean isFinal() {
@@ -60,8 +62,32 @@ public class State<T> implements Serializable {
 		return stateTransitions;
 	}
 
-	public void addStateTransition(final StateTransition<T> stateTransition) {
-		stateTransitions.add(stateTransition);
+
+	private void addStateTransition(
+		final StateTransitionAction action,
+		final State<T> targetState,
+		final FilterFunction<T> condition) {
+		stateTransitions.add(new StateTransition<T>(this, action, targetState, condition));
+	}
+
+	public void addIgnore(final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.IGNORE, this, condition);
+	}
+
+	public void addIgnore(final State<T> targetState,final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.IGNORE, targetState, condition);
+	}
+
+	public void addTake(final State<T> targetState, final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.TAKE, targetState, condition);
+	}
+
+	public void addProceed(final State<T> targetState, final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.PROCEED, targetState, condition);
+	}
+
+	public void addTake(final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.TAKE, this, condition);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index 479f28a..e3c7b7a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -27,12 +27,18 @@ public class StateTransition<T> implements Serializable {
 	private static final long serialVersionUID = -4825345749997891838L;
 
 	private final StateTransitionAction action;
+	private final State<T> sourceState;
 	private final State<T> targetState;
 	private final FilterFunction<T> condition;
 
-	public StateTransition(final StateTransitionAction action, final State<T> targetState, final FilterFunction<T> condition) {
+	public StateTransition(
+		final State<T> sourceState,
+		final StateTransitionAction action,
+		final State<T> targetState,
+		final FilterFunction<T> condition) {
 		this.action = action;
 		this.targetState = targetState;
+		this.sourceState = sourceState;
 		this.condition = condition;
 	}
 
@@ -44,6 +50,10 @@ public class StateTransition<T> implements Serializable {
 		return targetState;
 	}
 
+	public State<T> getSourceState() {
+		return sourceState;
+	}
+
 	public FilterFunction<T> getCondition() {
 		return condition;
 	}
@@ -55,6 +65,7 @@ public class StateTransition<T> implements Serializable {
 			StateTransition<T> other = (StateTransition<T>) obj;
 
 			return action == other.action &&
+				sourceState.getName().equals(other.sourceState.getName()) &&
 				targetState.getName().equals(other.targetState.getName());
 		} else {
 			return false;
@@ -64,14 +75,17 @@ public class StateTransition<T> implements Serializable {
 	@Override
 	public int hashCode() {
 		// we have to take the name of targetState because the transition might be reflexive
-		return Objects.hash(action, targetState.getName());
+		return Objects.hash(action, targetState.getName(), sourceState.getName());
 	}
 
 	@Override
 	public String toString() {
 		StringBuilder builder = new StringBuilder();
 
-		builder.append("StateTransition(").append(action).append(", ").append(targetState.getName());
+		builder.append("StateTransition(")
+			.append(action).append(", ")
+			.append(sourceState.getName()).append(", ")
+			.append(targetState.getName());
 
 		if (condition != null) {
 			builder.append(", with filter)");

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
index 70fc7fb..b8ca4e8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
@@ -22,7 +22,7 @@ package org.apache.flink.cep.nfa;
  * Set of actions when doing a state transition from a {@link State} to another.
  */
 public enum StateTransitionAction {
-	TAKE, // take the current event and assign it to the new state
-	IGNORE, // ignore the current event and do the state transition
+	TAKE, // take the current event and assign it to the current state
+	IGNORE, // ignore the current event
 	PROCEED // do the state transition and keep the current event for further processing (epsilon transition)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
deleted file mode 100644
index a3bb5f4..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa.compiler;
-
-/**
- * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
- * was not specified correctly.
- */
-public class MalformedPatternException extends RuntimeException {
-
-	private static final long serialVersionUID = 7751134834983361543L;
-
-	public MalformedPatternException(String message) {
-		super(message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 18ed21f..b476c49 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -22,18 +22,21 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.StateTransition;
-import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.cep.pattern.FollowedByPattern;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.NotFilterFunction;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -42,7 +45,7 @@ import java.util.Set;
  */
 public class NFACompiler {
 
-	protected final static String BEGINNING_STATE_NAME = "$beginningState$";
+	protected static final String ENDING_STATE_NAME = "$endState$";
 
 	/**
 	 * Compiles the given pattern into a {@link NFA}.
@@ -74,88 +77,288 @@ public class NFACompiler {
 	 */
 	@SuppressWarnings("unchecked")
 	public static <T> NFAFactory<T> compileFactory(
-		Pattern<T, ?> pattern,
-		TypeSerializer<T> inputTypeSerializer,
+		final Pattern<T, ?> pattern,
+		final TypeSerializer<T> inputTypeSerializer,
 		boolean timeoutHandling) {
 		if (pattern == null) {
 			// return a factory for empty NFAs
-			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
+			return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
 		} else {
-			// set of all generated states
-			Map<String, State<T>> states = new HashMap<>();
-			long windowTime;
+			final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
+			nfaFactoryCompiler.compileFactory();
+			return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
+		}
+	}
+
+	/**
+	 * Converts a {@link Pattern} into graph of {@link State}. It enables sharing of
+	 * compilation state across methods.
+	 *
+	 * @param <T>
+	 */
+	private static class NFAFactoryCompiler<T> {
+
+		private final Set<String> usedNames = new HashSet<>();
+		private final List<State<T>> states = new ArrayList<>();
 
-			// this is used to enforse pattern name uniqueness.
-			Set<String> patternNames = new HashSet<>();
+		private long windowTime = 0;
+		private Pattern<T, ?> currentPattern;
 
-			Pattern<T, ?> succeedingPattern;
-			State<T> succeedingState;
-			Pattern<T, ?> currentPattern = pattern;
+		NFAFactoryCompiler(final Pattern<T, ?> pattern) {
+			this.currentPattern = pattern;
+		}
 
+		/**
+		 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
+		 * multiple NFAs.
+		 */
+		void compileFactory() {
 			// we're traversing the pattern from the end to the beginning --> the first state is the final state
-			State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final);
-			patternNames.add(currentPattern.getName());
+			State<T> sinkState = createEndingState();
+			// add all the normal states
+			sinkState = createMiddleStates(sinkState);
+			// add the beginning state
+			createStartState(sinkState);
+		}
 
-			states.put(currentPattern.getName(), currentState);
+		List<State<T>> getStates() {
+			return states;
+		}
+
+		long getWindowTime() {
+			return windowTime;
+		}
+
+		/**
+		 * Creates the dummy Final {@link State} of the NFA graph.
+		 * @return dummy Final state
+		 */
+		private State<T> createEndingState() {
+			State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
+			states.add(endState);
+			usedNames.add(ENDING_STATE_NAME);
 
 			windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
+			return endState;
+		}
 
-			while (currentPattern.getPrevious() != null) {
-				succeedingPattern = currentPattern;
-				succeedingState = currentState;
-				currentPattern = currentPattern.getPrevious();
+		/**
+		 * Creates all the states between Start and Final state.
+		 * @param sinkState the state that last state should point to (always the Final state)
+		 * @return the next state after Start in the resulting graph
+		 */
+		private State<T> createMiddleStates(final State<T> sinkState) {
 
-				if (!patternNames.add(currentPattern.getName())) {
-					throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " +
-						"Pattern names must be unique.");
+			State<T> lastSink = sinkState;
+			while (currentPattern.getPrevious() != null) {
+				checkPatternNameUniqueness();
+
+				State<T> sourceState = new State<>(currentPattern.getName(), State.StateType.Normal);
+				states.add(sourceState);
+				usedNames.add(sourceState.getName());
+
+				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+					convertToLooping(sourceState, lastSink);
+
+					if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+						sourceState = createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal);
+						states.add(sourceState);
+						usedNames.add(sourceState.getName());
+					}
+				} else if (currentPattern.getQuantifier() == Quantifier.TIMES) {
+					sourceState = convertToTimesState(sourceState, lastSink, currentPattern.getTimes());
+				} else {
+					convertToSingletonState(sourceState, lastSink);
 				}
 
-				Time currentWindowTime = currentPattern.getWindowTime();
+				currentPattern = currentPattern.getPrevious();
+				lastSink = sourceState;
 
+				final Time currentWindowTime = currentPattern.getWindowTime();
 				if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
 					// the window time is the global minimum of all window times of each state
 					windowTime = currentWindowTime.toMilliseconds();
 				}
+			}
+
+			return lastSink;
+		}
+
+		private void checkPatternNameUniqueness() {
+			if (usedNames.contains(currentPattern.getName())) {
+				throw new MalformedPatternException(
+					"Duplicate pattern name: " + currentPattern.getName() + ". " +
+					"Pattern names must be unique.");
+			}
+		}
+
+		/**
+		 * Creates the Start {@link State} of the resulting NFA graph.
+		 * @param sinkState the state that Start state should point to (alwyas first state of middle states)
+		 * @return created state
+		 */
+		@SuppressWarnings("unchecked")
+		private State<T> createStartState(State<T> sinkState) {
+			checkPatternNameUniqueness();
+
+			final State<T> beginningState;
+			if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+				final State<T> loopingState;
+				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+					loopingState = new State<>(currentPattern.getName(), State.StateType.Normal);
+					beginningState = createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
+					states.add(loopingState);
+				} else {
+					loopingState = new State<>(currentPattern.getName(), State.StateType.Start);
+					beginningState = loopingState;
+				}
+				convertToLooping(loopingState, sinkState, true);
+			} else  {
+				if (currentPattern.getQuantifier() == Quantifier.TIMES && currentPattern.getTimes() > 1) {
+					final State<T> timesState = new State<>(currentPattern.getName(), State.StateType.Normal);
+					states.add(timesState);
+					sinkState = convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1);
+				}
 
-				if (states.containsKey(currentPattern.getName())) {
-					currentState = states.get(currentPattern.getName());
+				beginningState = new State<>(currentPattern.getName(), State.StateType.Start);
+				convertToSingletonState(beginningState, sinkState);
+			}
+
+			states.add(beginningState);
+			usedNames.add(beginningState.getName());
+
+			return beginningState;
+		}
+
+		/**
+		 * Converts the given state into a "complex" state consisting of given number of states with
+		 * same {@link FilterFunction}
+		 *
+		 * @param sourceState the state to be converted
+		 * @param sinkState the state that the converted state should point to
+		 * @param times number of times the state should be copied
+		 * @return the first state of the "complex" state, next state should point to it
+		 */
+		private State<T> convertToTimesState(final State<T> sourceState, final State<T> sinkState, int times) {
+			convertToSingletonState(sourceState, sinkState);
+			State<T> lastSink;
+			State<T> firstState = sourceState;
+			for (int i = 0; i < times - 1; i++) {
+				lastSink = firstState;
+				firstState = new State<>(currentPattern.getName(), State.StateType.Normal);
+				states.add(firstState);
+				convertToSingletonState(firstState, lastSink);
+			}
+			return firstState;
+		}
+
+		/**
+		 * Converts the given state into a simple single state. For an OPTIONAL state it also consists
+		 * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
+		 * in computation state graph  can be created only once.
+		 *
+		 * @param sourceState the state to be converted
+		 * @param sinkState state that the state being converted should point to
+		 */
+		@SuppressWarnings("unchecked")
+		private void convertToSingletonState(final State<T> sourceState, final State<T> sinkState) {
+
+			final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+			final FilterFunction<T> trueFunction = FilterFunctions.trueFunction();
+			sourceState.addTake(sinkState, currentFilterFunction);
+
+			if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
+				sourceState.addProceed(sinkState, trueFunction);
+			}
+
+			if (currentPattern instanceof FollowedByPattern) {
+				final State<T> ignoreState;
+				if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
+					ignoreState = new State<>(currentPattern.getName(), State.StateType.Normal);
+					ignoreState.addTake(sinkState, currentFilterFunction);
+					states.add(ignoreState);
 				} else {
-					currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
-					states.put(currentState.getName(), currentState);
+					ignoreState = sourceState;
 				}
+				sourceState.addIgnore(ignoreState, trueFunction);
+			}
+		}
 
-				currentState.addStateTransition(new StateTransition<T>(
-					StateTransitionAction.TAKE,
-					succeedingState,
-					(FilterFunction<T>) succeedingPattern.getFilterFunction()));
-
-				if (succeedingPattern instanceof FollowedByPattern) {
-					// the followed by pattern entails a reflexive ignore transition
-					currentState.addStateTransition(new StateTransition<T>(
-						StateTransitionAction.IGNORE,
-						currentState,
-						null
-					));
+		/**
+		 * Patterns with quantifiers AT_LEAST_ONE_* are converted into pair of states: a singleton state and
+		 * looping state. This method creates the first of the two.
+		 *
+		 * @param sinkState the state the newly created state should point to, it should be a looping state
+		 * @param stateType the type of the created state, as the NFA graph can also start wit AT_LEAST_ONE_*
+		 * @return the newly created state
+		 */
+		@SuppressWarnings("unchecked")
+		private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState, final State.StateType stateType) {
+
+			final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+			final State<T> firstState = new State<>(currentPattern.getName(), stateType);
+
+			firstState.addTake(sinkState, currentFilterFunction);
+			if (currentPattern instanceof FollowedByPattern) {
+				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+					firstState.addIgnore(new NotFilterFunction<>(currentFilterFunction));
+				} else {
+					firstState.addIgnore(FilterFunctions.<T>trueFunction());
 				}
 			}
+			return firstState;
+		}
 
-			// add the beginning state
-			final State<T> beginningState;
+		/**
+		 * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
+		 * for each PROCEED transition branches in computation state graph  can be created only once.
+		 *
+		 * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern}
+		 * to enable combinations.
+		 *
+		 * @param sourceState  the state to converted
+		 * @param sinkState    the state that the converted state should point to
+		 * @param isFirstState if the looping state is first of a graph
+		 */
+		@SuppressWarnings("unchecked")
+		private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) {
+
+			final FilterFunction<T> filterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+			final FilterFunction<T> trueFunction = FilterFunctions.<T>trueFunction();
+
+			sourceState.addProceed(sinkState, trueFunction);
+			sourceState.addTake(filterFunction);
+			if (currentPattern instanceof FollowedByPattern || isFirstState) {
+				final State<T> ignoreState = new State<>(
+					currentPattern.getName(),
+					State.StateType.Normal);
+
+
+				final FilterFunction<T> ignoreCondition;
+				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+					ignoreCondition = new NotFilterFunction<>(filterFunction);
+				} else {
+					ignoreCondition = trueFunction;
+				}
 
-			if (states.containsKey(BEGINNING_STATE_NAME)) {
-				beginningState = states.get(BEGINNING_STATE_NAME);
-			} else {
-				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-				states.put(BEGINNING_STATE_NAME, beginningState);
+				sourceState.addIgnore(ignoreState, ignoreCondition);
+				ignoreState.addTake(sourceState, filterFunction);
+				ignoreState.addIgnore(ignoreState, ignoreCondition);
+				states.add(ignoreState);
 			}
+		}
 
-			beginningState.addStateTransition(new StateTransition<T>(
-				StateTransitionAction.TAKE,
-				currentState,
-				(FilterFunction<T>) currentPattern.getFilterFunction()
-			));
-
-			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
+		/**
+		 * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
+		 * for each PROCEED transition branches in computation state graph  can be created only once.
+		 *
+		 * @param sourceState the state to converted
+		 * @param sinkState   the state that the converted state should point to
+		 */
+		private void convertToLooping(final State<T> sourceState, final State<T> sinkState) {
+			convertToLooping(sourceState, sinkState, false);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
new file mode 100644
index 0000000..12e58ba
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class FilterFunctions<T> {
+
+	private FilterFunctions() {
+	}
+
+	public static <T> FilterFunction<T> trueFunction()  {
+		return new FilterFunction<T>() {
+			@Override
+			public boolean filter(T value) throws Exception {
+				return true;
+			}
+		};
+	}
+
+	public static <T> FilterFunction<T> falseFunction()  {
+		return new FilterFunction<T>() {
+			@Override
+			public boolean filter(T value) throws Exception {
+				return false;
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
new file mode 100644
index 0000000..c85f3be
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+/**
+ * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
+ * was not specified correctly.
+ */
+public class MalformedPatternException extends RuntimeException {
+
+	private static final long serialVersionUID = 7751134834983361543L;
+
+	public MalformedPatternException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
new file mode 100644
index 0000000..a4fc8f5
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which negates filter function.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class NotFilterFunction<T> implements FilterFunction<T> {
+	private static final long serialVersionUID = -2109562093871155005L;
+
+	private final FilterFunction<T> original;
+
+	public NotFilterFunction(final FilterFunction<T> original) {
+		this.original = original;
+	}
+
+	@Override
+	public boolean filter(T value) throws Exception {
+		return !original.filter(value);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 7ea675f..7b4d9c7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Base class for a pattern definition.
@@ -53,6 +54,10 @@ public class Pattern<T, F extends T> {
 	// window length in which the pattern match has to occur
 	private Time windowTime;
 
+	private Quantifier quantifier = Quantifier.ONE;
+
+	private int times;
+
 	protected Pattern(final String name, final Pattern<T, ? extends T> previous) {
 		this.name = name;
 		this.previous = previous;
@@ -74,6 +79,14 @@ public class Pattern<T, F extends T> {
 		return windowTime;
 	}
 
+	public Quantifier getQuantifier() {
+		return quantifier;
+	}
+
+	public int getTimes() {
+		return times;
+	}
+
 	/**
 	 * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
 	 *
@@ -183,4 +196,106 @@ public class Pattern<T, F extends T> {
 		return new Pattern<X, X>(name, null);
 	}
 
+	/**
+	 * Specifies that this pattern can occur zero or more times(kleene star).
+	 * This means any number of events can be matched in this state.
+	 *
+	 * @return The same pattern with applied Kleene star operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> zeroOrMore() {
+		return zeroOrMore(true);
+	}
+
+	/**
+	 * Specifies that this pattern can occur zero or more times(kleene star).
+	 * This means any number of events can be matched in this state.
+	 *
+	 * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns:
+	 * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+	 *
+	 * @param eager if true the pattern always consumes earlier events
+	 * @return The same pattern with applied Kleene star operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> zeroOrMore(final boolean eager) {
+		checkIfQuantifierApplied();
+		if (eager) {
+			this.quantifier = Quantifier.ZERO_OR_MORE_EAGER;
+		} else {
+			this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS;
+		}
+		return this;
+	}
+
+	/**
+	 * Specifies that this pattern can occur one or more times(kleene star).
+	 * This means at least one and at most infinite number of events can be matched in this state.
+	 *
+	 * @return The same pattern with applied Kleene plus operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> oneOrMore() {
+		return oneOrMore(true);
+	}
+
+	/**
+	 * Specifies that this pattern can occur one or more times(kleene star).
+	 * This means at least one and at most infinite number of events can be matched in this state.
+	 *
+	 * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns:
+	 * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+	 *
+	 * @param eager if true the pattern always consumes earlier events
+	 * @return The same pattern with applied Kleene plus operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> oneOrMore(final boolean eager) {
+		checkIfQuantifierApplied();
+		if (eager) {
+			this.quantifier = Quantifier.ONE_OR_MORE_EAGER;
+		} else {
+			this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS;
+		}
+		return this;
+	}
+
+	/**
+	 * Specifies that this pattern can occur zero or once.
+	 *
+	 * @return The same pattern with applied Kleene ? operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> optional() {
+		checkIfQuantifierApplied();
+		this.quantifier = Quantifier.OPTIONAL;
+		return this;
+	}
+
+	/**
+	 * Specifies exact number of times that this pattern should be matched.
+	 *
+	 * @param times number of times matching event must appear
+	 * @return The same pattern with number of times applied
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> times(int times) {
+		checkIfQuantifierApplied();
+		Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
+		this.quantifier = Quantifier.TIMES;
+		this.times = times;
+		return this;
+	}
+
+	private void checkIfQuantifierApplied() {
+		if (this.quantifier != Quantifier.ONE) {
+			throw new MalformedPatternException("Already applied quantifier to this Pattern. Current quantifier is: " + this.quantifier);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
new file mode 100644
index 0000000..7abe9bd
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import java.util.EnumSet;
+
+public enum Quantifier {
+	ONE,
+	ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER),
+	ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
+	ONE_OR_MORE_EAGER(
+		QuantifierProperty.LOOPING,
+		QuantifierProperty.EAGER,
+		QuantifierProperty.AT_LEAST_ONE),
+	ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE),
+	TIMES,
+	OPTIONAL;
+
+	private final EnumSet<QuantifierProperty> properties;
+
+	Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) {
+		this.properties = EnumSet.of(first, rest);
+	}
+
+	Quantifier() {
+		this.properties = EnumSet.noneOf(QuantifierProperty.class);
+	}
+
+	public boolean hasProperty(QuantifierProperty property) {
+		return properties.contains(property);
+	}
+
+	public enum QuantifierProperty {
+		LOOPING,
+		EAGER,
+		AT_LEAST_ONE
+	}
+
+}


[2/3] flink git commit: [FLINK-3318] Add support for quantifiers to CEP's pattern API

Posted by kk...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index ccae848..825ba957 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -46,9 +46,9 @@ public class NFAITCase extends TestLogger {
 	public void testSimplePatternNFA() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
-		Event startEvent = new Event(42, "start", 1.0);
+		Event startEvent = new Event(41, "start", 1.0);
 		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		Event endEvent=  new Event(43, "end", 1.0);
+		Event endEvent = new Event(43, "end", 1.0);
 
 		inputEvents.add(new StreamRecord<Event>(startEvent, 1));
 		inputEvents.add(new StreamRecord<Event>(new Event(43, "foobar", 1.0), 2));
@@ -102,6 +102,99 @@ public class NFAITCase extends TestLogger {
 		assertEquals(endEvent, patternMap.get("end"));
 	}
 
+	@Test
+	public void testStrictContinuityWithResults() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event end = new Event(42, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(end, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).next("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(1, allPatterns.size());
+		assertEquals(Sets.<Set<Event>>newHashSet(
+			Sets.newHashSet(middleEvent1, end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStrictContinuityNoResults() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "c", 3.0);
+		Event end = new Event(43, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(end, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).next("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+			}
+		}
+
+		assertEquals(Sets.newHashSet(), resultingPatterns);
+	}
+
 	/**
 	 * Tests that the NFA successfully filters out expired elements with respect to the window
 	 * length
@@ -327,6 +420,1247 @@ public class NFAITCase extends TestLogger {
 		), patterns);
 	}
 
+	@Test
+	public void testComplexBranchingAfterKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+		Event end2 = new Event(45, "d", 6.0);
+		Event end3 = new Event(46, "d", 7.0);
+		Event end4 = new Event(47, "e", 8.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+		inputEvents.add(new StreamRecord<>(end2, 7));
+		inputEvents.add(new StreamRecord<>(end3, 8));
+		inputEvents.add(new StreamRecord<>(end4, 9));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		})
+			.followedBy("end2").where(new FilterFunction<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			})
+			.followedBy("end3").where(new FilterFunction<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("e");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(16, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent1, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent2, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent3, end1, end2, end4),
+			Sets.newHashSet(startEvent, end1, end2, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent1, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent2, end1, end3, end4),
+			Sets.newHashSet(startEvent, middleEvent3, end1, end3, end4),
+			Sets.newHashSet(startEvent, end1, end3, end4)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(4, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(startEvent, middleEvent1, end1),
+			Sets.newHashSet(startEvent, middleEvent2, end1),
+			Sets.newHashSet(startEvent, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testEagerKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(true).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(4, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(startEvent, middleEvent1, end1),
+			Sets.newHashSet(startEvent, end1)
+		), resultingPatterns);
+	}
+
+
+	@Test
+	public void testBeginWithKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event middleEvent1 = new Event(40, "a", 2.0);
+		Event middleEvent2 = new Event(41, "a", 3.0);
+		Event middleEvent3 = new Event(41, "a", 3.0);
+		Event end = new Event(42, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore().followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(7, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3, end),
+			Sets.newHashSet(middleEvent1, middleEvent2, end),
+			Sets.newHashSet(middleEvent2, middleEvent3, end),
+			Sets.newHashSet(middleEvent1, end),
+			Sets.newHashSet(middleEvent2, end),
+			Sets.newHashSet(middleEvent3, end),
+			Sets.newHashSet(end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testKleeneStarAfterKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "d", 3.0);
+		Event middleEvent3 = new Event(43, "d", 4.0);
+		Event end = new Event(44, "e", 4.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle-first").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false).followedBy("middle-second").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("e");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(8, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end),
+			Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end),
+			Sets.newHashSet(startEvent, middleEvent3, end),
+			Sets.newHashSet(startEvent, middleEvent2, end),
+			Sets.newHashSet(startEvent, middleEvent1, end),
+			Sets.newHashSet(startEvent, end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testKleeneStarAfterBranching() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event merging = new Event(42, "f", 3.0);
+		Event kleene1 = new Event(43, "d", 4.0);
+		Event kleene2 = new Event(44, "d", 4.0);
+		Event end = new Event(45, "e", 4.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(merging, 5));
+		inputEvents.add(new StreamRecord<>(kleene1, 6));
+		inputEvents.add(new StreamRecord<>(kleene2, 7));
+		inputEvents.add(new StreamRecord<>(end, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("branching").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedBy("merging").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("f");
+			}
+		}).followedBy("kleene").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("e");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(8, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, merging, end),
+			Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, end),
+			Sets.newHashSet(startEvent, middleEvent1, merging, kleene2, end),
+			Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, kleene2, end),
+			Sets.newHashSet(startEvent, middleEvent2, merging, end),
+			Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, end),
+			Sets.newHashSet(startEvent, middleEvent2, merging, kleene2, end),
+			Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, kleene2, end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStrictContinuityNoResultsAfterKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event start = new Event(40, "d", 2.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 2.0);
+		Event middleEvent3 = new Event(43, "c", 3.0);
+		Event end = new Event(44, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(start, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore()
+			.next("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+			}
+		}
+
+		assertEquals(Sets.newHashSet(), resultingPatterns);
+	}
+
+	@Test
+	public void testStrictContinuityResultsAfterKleeneStar() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event start = new Event(40, "d", 2.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 2.0);
+		Event end = new Event(43, "b", 4.0);
+
+		inputEvents.add(new StreamRecord<>(start, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(end, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false)
+			.next("end").where(new FilterFunction<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(start, middleEvent1, middleEvent2, end),
+			Sets.newHashSet(start, middleEvent2, end)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testAtLeastOne() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(3, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(startEvent, middleEvent1, end1),
+			Sets.newHashSet(startEvent, middleEvent2, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testBeginWithAtLeastOne() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent1 = new Event(41, "a", 2.0);
+		Event startEvent2 = new Event(42, "a", 3.0);
+		Event startEvent3 = new Event(42, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 3));
+		inputEvents.add(new StreamRecord<>(startEvent2, 4));
+		inputEvents.add(new StreamRecord<>(startEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(7, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent1, startEvent2, startEvent3, end1),
+			Sets.newHashSet(startEvent1, startEvent2, end1),
+			Sets.newHashSet(startEvent1, startEvent3, end1),
+			Sets.newHashSet(startEvent2, startEvent3, end1),
+			Sets.newHashSet(startEvent1, end1),
+			Sets.newHashSet(startEvent2, end1),
+			Sets.newHashSet(startEvent3, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testNextZeroOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "start", 1.0);
+		Event middleEvent1 = new Event(40, "middle", 2.0);
+		Event middleEvent2 = new Event(40, "middle", 3.0);
+		Event middleEvent3 = new Event(40, "middle", 4.0);
+		Event endEvent = new Event(46, "end", 1.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1L));
+		inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 2L));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3L));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4L));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5L));
+		inputEvents.add(new StreamRecord<>(endEvent, 6L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).next("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("middle");
+			}
+		}).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(1, allPatterns.size());
+		assertEquals(Sets.<Set<Event>>newHashSet(
+			Sets.newHashSet(startEvent, endEvent)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testAtLeastOneEager() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(true).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(3, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(startEvent, middleEvent1, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent, middleEvent, end1),
+			Sets.newHashSet(startEvent, end1)
+		), resultingPatterns);
+	}
+
+
+	@Test
+	public void testTimes() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(1, allPatterns.size());
+		assertEquals(Sets.<Set<Event>>newHashSet(
+			Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStartWithTimes() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.<Set<Event>>newHashSet(
+			Sets.newHashSet(middleEvent1, middleEvent2, end1),
+			Sets.newHashSet(middleEvent2, middleEvent3, end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStartWithOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent,  end1),
+			Sets.newHashSet(end1)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testEndWithZeroOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(4, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2, middleEvent3),
+			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2),
+			Sets.newHashSet(startEvent,  middleEvent1),
+			Sets.newHashSet(startEvent)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testStartAndEndWithZeroOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "d", 5.0);
+		Event end2 = new Event(45, "d", 5.0);
+		Event end3 = new Event(46, "d", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(end1, 6));
+		inputEvents.add(new StreamRecord<>(end2, 6));
+		inputEvents.add(new StreamRecord<>(end3, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(6, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(middleEvent1,  middleEvent2, middleEvent3),
+			Sets.newHashSet(middleEvent1,  middleEvent2),
+			Sets.newHashSet(middleEvent1),
+			Sets.newHashSet(middleEvent2,  middleEvent3),
+			Sets.newHashSet(middleEvent2),
+			Sets.newHashSet(middleEvent3)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testEndWithOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).optional();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(2, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent,  middleEvent1),
+			Sets.newHashSet(startEvent)
+		), resultingPatterns);
+	}
+
+	@Test
+	public void testEndWithOneOrMore() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		Set<Set<Event>> resultingPatterns = new HashSet<>();
+		List<Collection<Event>> allPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, Event>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, Event> foundPattern : patterns) {
+				resultingPatterns.add(new HashSet<>(foundPattern.values()));
+				allPatterns.add(foundPattern.values());
+			}
+		}
+
+		assertEquals(3, allPatterns.size());
+		assertEquals(Sets.newHashSet(
+			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2, middleEvent3),
+			Sets.newHashSet(startEvent,  middleEvent1, middleEvent2),
+			Sets.newHashSet(startEvent,  middleEvent1)
+		), resultingPatterns);
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 9f65132..40a0e7e 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep.nfa;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -51,12 +52,12 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L));
 
-		State<Event> startingState = new State<>("", State.StateType.Start);
-		State<Event> startState = new State<>("start", State.StateType.Normal);
-		State<Event> endState = new State<>("end", State.StateType.Final);
-		StateTransition<Event> starting2Start = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			startState,
+		State<Event> startState = new State<>("start", State.StateType.Start);
+		State<Event> endState = new State<>("end", State.StateType.Normal);
+		State<Event> endingState = new State<>("", State.StateType.Final);
+
+		startState.addTake(
+			endState,
 			new FilterFunction<Event>() {
 				private static final long serialVersionUID = -4869589195918650396L;
 
@@ -64,12 +65,9 @@ public class NFATest extends TestLogger {
 				public boolean filter(Event value) throws Exception {
 					return value.getName().equals("start");
 				}
-			}
-		);
-
-		StateTransition<Event> start2End = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			endState,
+			});
+		endState.addTake(
+			endingState,
 			new FilterFunction<Event>() {
 				private static final long serialVersionUID = 2979804163709590673L;
 
@@ -77,18 +75,12 @@ public class NFATest extends TestLogger {
 				public boolean filter(Event value) throws Exception {
 					return value.getName().equals("end");
 				}
-			}
-		);
-
-		StateTransition<Event> start2Start = new StateTransition<>(StateTransitionAction.IGNORE, startState, null);
-
-		startingState.addStateTransition(starting2Start);
-		startState.addStateTransition(start2End);
-		startState.addStateTransition(start2Start);
+			});
+		endState.addIgnore(FilterFunctions.<Event>trueFunction());
 
-		nfa.addState(startingState);
 		nfa.addState(startState);
 		nfa.addState(endState);
+		nfa.addState(endingState);
 
 		Set<Map<String, Event>> expectedPatterns = new HashSet<>();
 
@@ -196,8 +188,10 @@ public class NFATest extends TestLogger {
 	public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
 		Set<Map<String, T>> actualPatterns = new HashSet<>();
 
-		for (StreamRecord<T> streamEvent: inputs) {
-			Collection<Map<String, T>> matchedPatterns = nfa.process(streamEvent.getValue(), streamEvent.getTimestamp()).f0;
+		for (StreamRecord<T> streamEvent : inputs) {
+			Collection<Map<String, T>> matchedPatterns = nfa.process(
+				streamEvent.getValue(),
+				streamEvent.getTimestamp()).f0;
 
 			actualPatterns.addAll(matchedPatterns);
 		}
@@ -213,24 +207,12 @@ public class NFATest extends TestLogger {
 		State<Event> startState = new State<>("start", State.StateType.Normal);
 		State<Event> endState = new State<>("end", State.StateType.Final);
 
-		StateTransition<Event> starting2Start = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			startState,
-			new NameFilter("start"));
 
-		StateTransition<Event> start2End = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			endState,
+		startingState.addTake(
+			new NameFilter("start"));
+		startState.addTake(
 			new NameFilter("end"));
-
-		StateTransition<Event> start2Start = new StateTransition<>(
-			StateTransitionAction.IGNORE,
-			startState,
-			null);
-
-		startingState.addStateTransition(starting2Start);
-		startState.addStateTransition(start2End);
-		startState.addStateTransition(start2Start);
+		startState.addIgnore(null);
 
 		nfa.addState(startingState);
 		nfa.addState(startState);
@@ -253,12 +235,12 @@ public class NFATest extends TestLogger {
 	private NFA<Event> createStartEndNFA(long windowLength) {
 		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false);
 
-		State<Event> startingState = new State<>("", State.StateType.Start);
-		State<Event> startState = new State<>("start", State.StateType.Normal);
-		State<Event> endState = new State<>("end", State.StateType.Final);
-		StateTransition<Event> starting2Start = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			startState,
+		State<Event> startState = new State<>("start", State.StateType.Start);
+		State<Event> endState = new State<>("end", State.StateType.Normal);
+		State<Event> endingState = new State<>("", State.StateType.Final);
+
+		startState.addTake(
+			endState,
 			new FilterFunction<Event>() {
 				private static final long serialVersionUID = -4869589195918650396L;
 
@@ -267,10 +249,8 @@ public class NFATest extends TestLogger {
 					return value.getName().equals("start");
 				}
 			});
-
-		StateTransition<Event> start2End = new StateTransition<>(
-			StateTransitionAction.TAKE,
-			endState,
+		endState.addTake(
+			endingState,
 			new FilterFunction<Event>() {
 				private static final long serialVersionUID = 2979804163709590673L;
 
@@ -279,19 +259,11 @@ public class NFATest extends TestLogger {
 					return value.getName().equals("end");
 				}
 			});
+		endState.addIgnore(FilterFunctions.<Event>trueFunction());
 
-		StateTransition<Event> start2Start = new StateTransition<>(
-			StateTransitionAction.IGNORE,
-			startState,
-			null);
-
-		startingState.addStateTransition(starting2Start);
-		startState.addStateTransition(start2End);
-		startState.addStateTransition(start2Start);
-
-		nfa.addState(startingState);
 		nfa.addState(startState);
 		nfa.addState(endState);
+		nfa.addState(endingState);
 
 		return nfa;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index d11f3a8..93d78cc 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
+import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
@@ -28,22 +30,45 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.Assert.assertTrue;
+import static com.google.common.collect.Sets.newHashSet;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class NFACompilerTest extends TestLogger {
 
+	private static final FilterFunction<Event> startFilter = new FilterFunction<Event>() {
+		private static final long serialVersionUID = 3314714776170474221L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getPrice() > 2;
+		}
+	};
+
+	private static final FilterFunction<Event> endFilter = new FilterFunction<Event>() {
+		private static final long serialVersionUID = 3990995859716364087L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("end");
+		}
+	};
+
+	private static final TypeSerializer<Event> serializer = TypeExtractor.createTypeInfo(Event.class)
+		.createSerializer(new ExecutionConfig());
+
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
@@ -81,83 +106,96 @@ public class NFACompilerTest extends TestLogger {
 	 */
 	@Test
 	public void testNFACompilerWithSimplePattern() {
-		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
-			private static final long serialVersionUID = 3314714776170474221L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getPrice() > 2;
-			}
-		})
-		.followedBy("middle").subtype(SubEvent.class)
-		.next("end").where(new FilterFunction<Event>() {
-				private static final long serialVersionUID = 3990995859716364087L;
+		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
+			.followedBy("middle").subtype(SubEvent.class)
+			.next("end").where(endFilter);
 
-				@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("end");
-			}
-		});
-
-		TypeInformation<Event> typeInformation = TypeExtractor.createTypeInfo(Event.class);
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, typeInformation.createSerializer(new ExecutionConfig()), false);
+		NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
 
 		Set<State<Event>> states = nfa.getStates();
-
 		assertEquals(4, states.size());
 
 		Map<String, State<Event>> stateMap = new HashMap<>();
-
-		for (State<Event> state: states) {
+		for (State<Event> state : states) {
 			stateMap.put(state.getName(), state);
 		}
 
-		assertTrue(stateMap.containsKey(NFACompiler.BEGINNING_STATE_NAME));
-		State<Event> beginningState = stateMap.get(NFACompiler.BEGINNING_STATE_NAME);
-
-		assertTrue(beginningState.isStart());
-
 		assertTrue(stateMap.containsKey("start"));
 		State<Event> startState = stateMap.get("start");
+		assertTrue(startState.isStart());
+		final Set<Tuple2<String, StateTransitionAction>> startTransitions = unfoldTransitions(startState);
+		assertEquals(newHashSet(
+			Tuple2.of("middle", StateTransitionAction.TAKE)
+		), startTransitions);
 
-		Collection<StateTransition<Event>> startTransitions = startState.getStateTransitions();
-		Map<String, StateTransition<Event>> startTransitionMap = new HashMap<>();
+		assertTrue(stateMap.containsKey("middle"));
+		State<Event> middleState = stateMap.get("middle");
+		final Set<Tuple2<String, StateTransitionAction>> middleTransitions = unfoldTransitions(middleState);
+		assertEquals(newHashSet(
+			Tuple2.of("middle", StateTransitionAction.IGNORE),
+			Tuple2.of("end", StateTransitionAction.TAKE)
+		), middleTransitions);
 
-		for (StateTransition<Event> transition: startTransitions) {
-			startTransitionMap.put(transition.getTargetState().getName(), transition);
-		}
+		assertTrue(stateMap.containsKey("end"));
+		State<Event> endState = stateMap.get("end");
+		final Set<Tuple2<String, StateTransitionAction>> endTransitions = unfoldTransitions(endState);
+		assertEquals(newHashSet(
+			Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE)
+		), endTransitions);
+
+		assertTrue(stateMap.containsKey(NFACompiler.ENDING_STATE_NAME));
+		State<Event> endingState = stateMap.get(NFACompiler.ENDING_STATE_NAME);
+		assertTrue(endingState.isFinal());
+		assertEquals(0, endingState.getStateTransitions().size());
+	}
 
-		assertEquals(2, startTransitionMap.size());
-		assertTrue(startTransitionMap.containsKey("start"));
+	@Test
+	public void testNFACompilerWithKleeneStar() {
 
-		StateTransition<Event> reflexiveTransition = startTransitionMap.get("start");
-		assertEquals(StateTransitionAction.IGNORE, reflexiveTransition.getAction());
+		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
+			.followedBy("middle").subtype(SubEvent.class).zeroOrMore()
+			.followedBy("end").where(endFilter);
 
-		assertTrue(startTransitionMap.containsKey("middle"));
-		StateTransition<Event> startMiddleTransition = startTransitionMap.get("middle");
-		assertEquals(StateTransitionAction.TAKE, startMiddleTransition.getAction());
+		NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
 
-		assertTrue(stateMap.containsKey("middle"));
-		State<Event> middleState = stateMap.get("middle");
+		Set<State<Event>> states = nfa.getStates();
+		assertEquals(5, states.size());
 
-		Map<String, StateTransition<Event>> middleTransitionMap = new HashMap<>();
 
-		for (StateTransition<Event> transition: middleState.getStateTransitions()) {
-			middleTransitionMap.put(transition.getTargetState().getName(), transition);
+		Set<Tuple2<String, Set<Tuple2<String, StateTransitionAction>>>> stateMap = new HashSet<>();
+		for (State<Event> state : states) {
+			stateMap.add(Tuple2.of(state.getName(), unfoldTransitions(state)));
 		}
 
-		assertEquals(1, middleTransitionMap.size());
+		assertEquals(stateMap, newHashSet(
+			Tuple2.of("start", newHashSet(Tuple2.of("middle", StateTransitionAction.TAKE))),
+			Tuple2.of("middle", newHashSet(
+				Tuple2.of("middle", StateTransitionAction.IGNORE),
+				Tuple2.of("middle", StateTransitionAction.TAKE)
+			)),
+		    Tuple2.of("middle", newHashSet(
+			    Tuple2.of("middle", StateTransitionAction.IGNORE),
+			    Tuple2.of("middle", StateTransitionAction.TAKE),
+			    Tuple2.of("end", StateTransitionAction.PROCEED)
+		    )),
+			Tuple2.of("end", newHashSet(
+				Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE),
+				Tuple2.of("end", StateTransitionAction.IGNORE)
+			)),
+		    Tuple2.of(NFACompiler.ENDING_STATE_NAME, Sets.newHashSet())
+		));
 
-		assertTrue(middleTransitionMap.containsKey("end"));
-		StateTransition<Event> middleEndTransition = middleTransitionMap.get("end");
+	}
 
-		assertEquals(StateTransitionAction.TAKE, middleEndTransition.getAction());
 
-		assertTrue(stateMap.containsKey("end"));
-		State<Event> endState = stateMap.get("end");
-
-		assertTrue(endState.isFinal());
-		assertEquals(0, endState.getStateTransitions().size());
+	private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) {
+		final Set<Tuple2<String, StateTransitionAction>> transitions = new HashSet<>();
+		for (StateTransition<T> transition : state.getStateTransitions()) {
+			transitions.add(Tuple2.of(
+				transition.getTargetState().getName(),
+				transition.getAction()));
+		}
+		return transitions;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 98c3f5a..68b0419 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -184,4 +184,58 @@ public class PatternTest extends TestLogger {
 		assertEquals(previous2.getName(), "start");
 	}
 
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).oneOrMore().zeroOrMore();
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce2() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).zeroOrMore().times(1);
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce3() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).times(1).oneOrMore();
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce4() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).oneOrMore().oneOrMore(true);
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testPatternCanHaveQuantifierSpecifiedOnce5() throws Exception {
+
+		Pattern.begin("start").where(new FilterFunction<Object>() {
+			@Override
+			public boolean filter(Object value) throws Exception {
+				return true;
+			}
+		}).oneOrMore(true).zeroOrMore(true);
+	}
 }