You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:17 UTC

[06/50] [abbrv] flink git commit: [FLINK-3318] Add support for quantifiers to CEP's pattern API

[FLINK-3318] Add support for quantifiers to CEP's pattern API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9001c4ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9001c4ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9001c4ef

Branch: refs/heads/table-retraction
Commit: 9001c4ef82a7a04d821252ac62bb7809a931c98a
Parents: d0695c0
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Sat Mar 18 20:53:00 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |   78 +-
 .../flink/cep/scala/pattern/Pattern.scala       |   81 +-
 .../apache/flink/cep/nfa/ComputationState.java  |   44 +-
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |   17 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  398 ++++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |   25 +-
 .../java/org/apache/flink/cep/nfa/State.java    |   32 +-
 .../apache/flink/cep/nfa/StateTransition.java   |   20 +-
 .../flink/cep/nfa/StateTransitionAction.java    |    4 +-
 .../nfa/compiler/MalformedPatternException.java |   32 -
 .../flink/cep/nfa/compiler/NFACompiler.java     |  317 ++++-
 .../flink/cep/pattern/FilterFunctions.java      |   44 +
 .../cep/pattern/MalformedPatternException.java  |   32 +
 .../flink/cep/pattern/NotFilterFunction.java    |   42 +
 .../org/apache/flink/cep/pattern/Pattern.java   |  115 ++
 .../apache/flink/cep/pattern/Quantifier.java    |   54 +
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 1338 +++++++++++++++++-
 .../java/org/apache/flink/cep/nfa/NFATest.java  |   90 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |  152 +-
 .../apache/flink/cep/pattern/PatternTest.java   |   54 +
 20 files changed, 2595 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 8047481..22cffbc 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -341,7 +341,45 @@ patternState.subtype(SubEvent.class);
 patternState.within(Time.seconds(10));
 {% endhighlight %}
           </td>
-      </tr>
+       </tr>
+       <tr>
+          <td><strong>ZeroOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
+              <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+      {% highlight java %}
+      patternState.zeroOrMore();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>OneOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
+              <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+      {% highlight java %}
+      patternState.oneOrMore();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Optional</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or once.</p>
+      {% highlight java %}
+      patternState.optional();
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Times</strong></td>
+          <td>
+              <p>Specifies exact number of times that this pattern should be matched.</p>
+      {% highlight java %}
+      patternState.times(2);
+      {% endhighlight %}
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>
@@ -419,6 +457,44 @@ patternState.within(Time.seconds(10))
 {% endhighlight %}
           </td>
       </tr>
+       <tr>
+          <td><strong>ZeroOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
+              <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+      {% highlight scala %}
+      patternState.zeroOrMore()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>OneOrMore</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
+              <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+      {% highlight scala %}
+      patternState.oneOrMore()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Optional</strong></td>
+          <td>
+              <p>Specifies that this pattern can occur zero or once.</p>
+      {% highlight scala %}
+      patternState.optional()
+      {% endhighlight %}
+          </td>
+       </tr>
+       <tr>
+          <td><strong>Times</strong></td>
+          <td>
+              <p>Specifies exact number of times that this pattern should be matched.</p>
+      {% highlight scala %}
+      patternState.times(2)
+      {% endhighlight %}
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index cc3b03c..5baf780 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -19,7 +19,7 @@ package org.apache.flink.cep.scala.pattern
 
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.cep
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern}
 import org.apache.flink.streaming.api.windowing.time.Time
 
 /**
@@ -59,6 +59,12 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
 
   /**
     *
+    * @return currently applied quantifier to this pattern
+    */
+  def getQuantifier: Quantifier = jPattern.getQuantifier
+
+  /**
+    *
     * @return Filter condition for an event to be matched
     */
   def getFilterFunction(): Option[FilterFunction[F]] = {
@@ -160,6 +166,79 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     wrapPattern(jPattern.getPrevious())
   }
 
+  /**
+    * Specifies that this pattern can occur zero or more times(kleene star).
+    * This means any number of events can be matched in this state.
+    *
+    * @return The same pattern with applied Kleene star operator
+    */
+  def zeroOrMore: Pattern[T, F] = {
+    jPattern.zeroOrMore()
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur zero or more times(kleene star).
+    * This means any number of events can be matched in this state.
+    *
+    * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns:
+    * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+    *
+    * @param eager if true the pattern always consumes earlier events
+    * @return The same pattern with applied Kleene star operator
+    */
+  def zeroOrMore(eager: Boolean): Pattern[T, F] = {
+    jPattern.zeroOrMore(eager)
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur one or more times(kleene star).
+    * This means at least one and at most infinite number of events can be matched in this state.
+    *
+    * @return The same pattern with applied Kleene plus operator
+    */
+  def oneOrMore: Pattern[T, F] = {
+    jPattern.oneOrMore()
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur one or more times(kleene star).
+    * This means at least one and at most infinite number of events can be matched in this state.
+    *
+    * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns:
+    * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+    *
+    * @param eager if true the pattern always consumes earlier events
+    * @return The same pattern with applied Kleene plus operator
+    */
+  def oneOrMore(eager: Boolean): Pattern[T, F] = {
+    jPattern.oneOrMore(eager)
+    this
+  }
+
+  /**
+    * Specifies that this pattern can occur zero or once.
+    *
+    * @return The same pattern with applied Kleene ? operator
+    */
+  def optional: Pattern[T, F] = {
+    jPattern.optional()
+    this
+  }
+
+  /**
+    * Specifies exact number of times that this pattern should be matched.
+    *
+    * @param times number of times matching event must appear
+    * @return The same pattern with number of times applied
+    */
+  def times(times: Int): Pattern[T, F] = {
+    jPattern.times(times)
+    this
+  }
+
 }
 
 object Pattern {

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 3f44fba..445d038 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * Helper class which encapsulates the state of the NFA computation. It points to the current state,
  * the last taken event, its occurrence timestamp, the current version and the starting timestamp
@@ -41,17 +43,21 @@ public class ComputationState<T> {
 	// Timestamp of the first element in the pattern
 	private final long startTimestamp;
 
-	public ComputationState(
-		final State<T> currentState,
-		final T event,
-		final long timestamp,
-		final DeweyNumber version,
-		final long startTimestamp) {
+	private final State<T> previousState;
+
+	private ComputationState(
+			final State<T> currentState,
+			final State<T> previousState,
+			final T event,
+			final long timestamp,
+			final DeweyNumber version,
+			final long startTimestamp) {
 		this.state = currentState;
 		this.event = event;
 		this.timestamp = timestamp;
 		this.version = version;
 		this.startTimestamp = startTimestamp;
+		this.previousState = previousState;
 	}
 
 	public boolean isFinalState() {
@@ -59,7 +65,7 @@ public class ComputationState<T> {
 	}
 
 	public boolean isStartState() {
-		return state.isStart();
+		return state.isStart() && event == null;
 	}
 
 	public long getTimestamp() {
@@ -74,6 +80,10 @@ public class ComputationState<T> {
 		return state;
 	}
 
+	public State<T> getPreviousState() {
+		return previousState;
+	}
+
 	public T getEvent() {
 		return event;
 	}
@@ -81,4 +91,24 @@ public class ComputationState<T> {
 	public DeweyNumber getVersion() {
 		return version;
 	}
+
+	public static <T> ComputationState<T> createStartState(final State<T> state) {
+		Preconditions.checkArgument(state.isStart());
+		return new ComputationState<>(state, null, null, -1L, new DeweyNumber(1), -1L);
+	}
+
+	public static <T> ComputationState<T> createStartState(final State<T> state, final DeweyNumber version) {
+		Preconditions.checkArgument(state.isStart());
+		return new ComputationState<>(state, null, null, -1L, version, -1L);
+	}
+
+	public static <T> ComputationState<T> createState(
+			final State<T> currentState,
+			final State<T> previousState,
+			final T event,
+			final long timestamp,
+			final DeweyNumber version,
+			final long startTimestamp) {
+		return new ComputationState<>(currentState, previousState, event, timestamp, version, startTimestamp);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index bb9039d..fd3fafa 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -44,6 +44,10 @@ public class DeweyNumber implements Serializable {
 		this.deweyNumber = deweyNumber;
 	}
 
+	public DeweyNumber(DeweyNumber number) {
+		this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length);
+	}
+
 	/**
 	 * Checks whether this dewey number is compatible to the other dewey number.
 	 *
@@ -90,8 +94,19 @@ public class DeweyNumber implements Serializable {
 	 * @return A new dewey number derived from this whose last digit is increased by one
 	 */
 	public DeweyNumber increase() {
+		return increase(1);
+	}
+
+	/**
+	 * Creates a new dewey number from this such that its last digit is increased by the supplied
+	 * number
+	 *
+	 * @param times how many times to increase the Dewey number
+	 * @return A new dewey number derived from this whose last digit is increased by given number
+	 */
+	public DeweyNumber increase(int times) {
 		int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length);
-		newDeweyNumber[deweyNumber.length - 1]++;
+		newDeweyNumber[deweyNumber.length - 1] += times;
 
 		return new DeweyNumber(newDeweyNumber);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 257418a..3d42248 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -39,7 +40,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -87,7 +87,7 @@ public class NFA<T> implements Serializable {
 	/**
 	 * 	Buffer used to store the matched events.
 	 */
-	private final SharedBuffer<State<T>, T> sharedBuffer;
+	private final SharedBuffer<String, T> sharedBuffer;
 
 	/**
 	 * A set of all the valid NFA states, as returned by the
@@ -98,7 +98,7 @@ public class NFA<T> implements Serializable {
 
 	/**
 	 * The length of a windowed pattern, as specified using the
-	 * {@link org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)}
+	 * {@link org.apache.flink.cep.pattern.Pattern#within(Time)}  Pattern.within(Time)}
 	 * method.
 	 */
 	private final long windowTime;
@@ -109,9 +109,6 @@ public class NFA<T> implements Serializable {
 	 */
 	private final boolean handleTimeout;
 
-	// Current starting index for the next dewey version number
-	private int startEventCounter;
-
 	/**
 	 * Current set of {@link ComputationState computation states} within the state machine.
 	 * These are the "active" intermediate states that are waiting for new matching
@@ -119,8 +116,6 @@ public class NFA<T> implements Serializable {
 	 */
 	private transient Queue<ComputationState<T>> computationStates;
 
-	private StateTransitionComparator<T>  stateTransitionComparator;
-
 	public NFA(
 			final TypeSerializer<T> eventSerializer,
 			final long windowTime,
@@ -129,11 +124,10 @@ public class NFA<T> implements Serializable {
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
 		this.windowTime = windowTime;
 		this.handleTimeout = handleTimeout;
-		this.sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
-		this.computationStates = new LinkedList<>();
-		this.states = new HashSet<>();
-		this.startEventCounter = 1;
-		this.stateTransitionComparator =  new StateTransitionComparator<>();
+		sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+		computationStates = new LinkedList<>();
+
+		states = new HashSet<>();
 	}
 
 	public Set<State<T>> getStates() {
@@ -150,7 +144,7 @@ public class NFA<T> implements Serializable {
 		states.add(state);
 
 		if (state.isStart()) {
-			computationStates.add(new ComputationState<>(state, null, -1L, null, -1L));
+			computationStates.add(ComputationState.createStartState(state));
 		}
 	}
 
@@ -201,8 +195,8 @@ public class NFA<T> implements Serializable {
 				}
 
 				// remove computation state which has exceeded the window length
-				sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
-				sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
+				sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
+				sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
 
 				newComputationStates = Collections.emptyList();
 			} else if (event != null) {
@@ -218,8 +212,8 @@ public class NFA<T> implements Serializable {
 					result.addAll(matches);
 
 					// remove found patterns because they are no longer needed
-					sharedBuffer.release(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp());
-					sharedBuffer.remove(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
 				} else {
 					// add new computation state; it will be processed once the next event arrives
 					computationStates.add(newComputationState);
@@ -252,8 +246,7 @@ public class NFA<T> implements Serializable {
 			return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
 				sharedBuffer.equals(other.sharedBuffer) &&
 				states.equals(other.states) &&
-				windowTime == other.windowTime &&
-				startEventCounter == other.startEventCounter;
+				windowTime == other.windowTime;
 		} else {
 			return false;
 		}
@@ -261,12 +254,80 @@ public class NFA<T> implements Serializable {
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime, startEventCounter);
+		return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime);
+	}
+
+	private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
+		return s1.getName().equals(s2.getName());
 	}
 
 	/**
+	 * Class for storing resolved transitions. It counts at insert time the number of
+	 * branching transitions both for IGNORE and TAKE actions.
+ 	 */
+	private static class OutgoingEdges<T> {
+		private List<StateTransition<T>> edges = new ArrayList<>();
+
+		private final State<T> currentState;
+
+		private int totalTakeBranches = 0;
+		private int totalIgnoreBranches = 0;
+
+		OutgoingEdges(final State<T> currentState) {
+			this.currentState = currentState;
+		}
+
+		void add(StateTransition<T> edge) {
+
+			if (!isSelfIgnore(edge)) {
+				if (edge.getAction() == StateTransitionAction.IGNORE) {
+					totalIgnoreBranches++;
+				} else if (edge.getAction() == StateTransitionAction.TAKE) {
+					totalTakeBranches++;
+				}
+			}
+
+			edges.add(edge);
+		}
+
+		int getTotalIgnoreBranches() {
+			return totalIgnoreBranches;
+		}
+		int getTotalTakeBranches() {
+			return totalTakeBranches;
+		}
+
+		List<StateTransition<T>> getEdges() {
+			return edges;
+		}
+
+		private boolean isSelfIgnore(final StateTransition<T> edge) {
+			return isEquivalentState(edge.getTargetState(), currentState) &&
+				edge.getAction() == StateTransitionAction.IGNORE;
+		}
+	}
+
+
+	/**
 	 * Computes the next computation states based on the given computation state, the current event,
-	 * its timestamp and the internal state machine.
+	 * its timestamp and the internal state machine. The algorithm is:
+	 *
+	 * 1. Decide on valid transitions and number of branching paths. See {@link OutgoingEdges}
+	 * 2. Perform transitions:
+	 *      a) IGNORE (links in {@link SharedBuffer} will still point to the previous event)
+	 *          - do not perform for Start State - special case
+	 *          - if stays in the same state increase the current stage for future use with number of
+	 *            outgoing edges
+	 *          - if after PROCEED increase current stage and add new stage (as we change the state)
+	 *          - lock the entry in {@link SharedBuffer} as it is needed in the created branch
+	 *      b) TAKE (links in {@link SharedBuffer} will point to the current event)
+	 *          - add entry to the shared buffer with version of the current computation state
+	 *          - add stage and then increase with number of takes for the future computation states
+	 *          - peek to the next state if it has PROCEED path to a Final State, if true create
+	 *            Final ComputationState to emit results
+	 * 3. Handle the Start State, as it always have to remain
+	 * 4. Release the corresponding entries in {@link SharedBuffer}.
+	 *
 	 *
 	 * @param computationState Current computation state
 	 * @param event Current event which is processed
@@ -277,31 +338,179 @@ public class NFA<T> implements Serializable {
 			final ComputationState<T> computationState,
 			final T event,
 			final long timestamp) {
-		Stack<State<T>> states = new Stack<>();
-		List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
-		State<T> state = computationState.getState();
 
-		states.push(state);
+		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState, event);
+
+		// Create the computing version based on the previously computed edges
+		// We need to defer the creation of computation states until we know how many edges start
+		// at this computation state so that we can assign proper version
+		final List<StateTransition<T>> edges = outgoingEdges.getEdges();
+		int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1);
+		int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches();
+
+		final List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
+		for (StateTransition<T> edge : edges) {
+			switch (edge.getAction()) {
+				case IGNORE: {
+					if (!computationState.isStartState()) {
+						final DeweyNumber version;
+						if (isEquivalentState(edge.getTargetState(), computationState.getState())) {
+							//Stay in the same state (it can be either looping one or singleton)
+							final int toIncrease = calculateIncreasingSelfState(
+								outgoingEdges.getTotalIgnoreBranches(),
+								outgoingEdges.getTotalTakeBranches());
+							version = computationState.getVersion().increase(toIncrease);
+						} else {
+							//IGNORE after PROCEED
+							version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+							ignoreBranchesToVisit--;
+						}
 
-		boolean branched = false;
-		while (!states.isEmpty()) {
-			State<T> currentState = states.pop();
-			final List<StateTransition<T>> stateTransitions = new ArrayList<>(currentState.getStateTransitions());
+						resultingComputationStates.add(
+							ComputationState.createState(
+								edge.getTargetState(),
+								computationState.getPreviousState(),
+								computationState.getEvent(),
+								computationState.getTimestamp(),
+								version,
+								computationState.getStartTimestamp()
+							)
+						);
+						sharedBuffer.lock(
+							computationState.getPreviousState().getName(),
+							computationState.getEvent(),
+							computationState.getTimestamp());
+					}
+				}
+				break;
+				case TAKE:
+					final State<T> newState = edge.getTargetState();
+					final State<T> consumingState = edge.getSourceState();
+					final State<T> previousEventState = computationState.getPreviousState();
+
+					final T previousEvent = computationState.getEvent();
+					final DeweyNumber currentVersion = computationState.getVersion();
+
+					final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+					takeBranchesToVisit--;
+
+					final long startTimestamp;
+					if (computationState.isStartState()) {
+						startTimestamp = timestamp;
+						sharedBuffer.put(
+							consumingState.getName(),
+							event,
+							timestamp,
+							currentVersion);
+					} else {
+						startTimestamp = computationState.getStartTimestamp();
+						sharedBuffer.put(
+							consumingState.getName(),
+							event,
+							timestamp,
+							previousEventState.getName(),
+							previousEvent,
+							computationState.getTimestamp(),
+							currentVersion);
+					}
 
-			// this is for when we restore from legacy. In that case, the comparator is null
-			// as it did not exist in the previous Flink versions, so we have to initialize it here.
+					// a new computation state is referring to the shared entry
+					sharedBuffer.lock(consumingState.getName(), event, timestamp);
+
+					resultingComputationStates.add(ComputationState.createState(
+						newState,
+						consumingState,
+						event,
+						timestamp,
+						newComputationStateVersion,
+						startTimestamp
+					));
+
+					//check if newly created state is optional (have a PROCEED path to Final state)
+					final State<T> finalState = findFinalStateAfterProceed(newState, event);
+					if (finalState != null) {
+						sharedBuffer.lock(consumingState.getName(), event, timestamp);
+						resultingComputationStates.add(ComputationState.createState(
+							finalState,
+							consumingState,
+							event,
+							timestamp,
+							newComputationStateVersion,
+							startTimestamp));
+					}
+					break;
+			}
+		}
 
-			if (stateTransitionComparator == null) {
-				stateTransitionComparator = new StateTransitionComparator();
+		if (computationState.isStartState()) {
+			final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), outgoingEdges.getTotalTakeBranches());
+			final ComputationState<T> startState = createStartState(computationState, totalBranches);
+			resultingComputationStates.add(startState);
+		}
+
+		if (computationState.getEvent() != null) {
+			// release the shared entry referenced by the current computation state.
+			sharedBuffer.release(
+				computationState.getPreviousState().getName(),
+				computationState.getEvent(),
+				computationState.getTimestamp());
+			// try to remove unnecessary shared buffer entries
+			sharedBuffer.remove(
+				computationState.getPreviousState().getName(),
+				computationState.getEvent(),
+				computationState.getTimestamp());
+		}
+
+		return resultingComputationStates;
+	}
+
+	private State<T> findFinalStateAfterProceed(State<T> state, T event) {
+		final Stack<State<T>> statesToCheck = new Stack<>();
+		statesToCheck.push(state);
+
+		try {
+			while (!statesToCheck.isEmpty()) {
+				final State<T> currentState = statesToCheck.pop();
+				for (StateTransition<T> transition : currentState.getStateTransitions()) {
+					if (transition.getAction() == StateTransitionAction.PROCEED &&
+						checkFilterCondition(transition.getCondition(), event)) {
+						if (transition.getTargetState().isFinal()) {
+							return transition.getTargetState();
+						} else {
+							statesToCheck.push(transition.getTargetState());
+						}
+					}
+				}
 			}
+		} catch (Exception e) {
+			throw new RuntimeException("Failure happened in filter function.", e);
+		}
+
+		return null;
+	}
+
+	private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
+		return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1;
+	}
+
+	private ComputationState<T> createStartState(final ComputationState<T> computationState, final int totalBranches) {
+		final DeweyNumber startVersion = computationState.getVersion().increase(totalBranches);
+		return ComputationState.createStartState(computationState.getState(), startVersion);
+	}
 
-			// impose the IGNORE will be processed last
-			Collections.sort(stateTransitions, stateTransitionComparator);
+	private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) {
+		final Stack<State<T>> states = new Stack<>();
+		states.push(computationState.getState());
+		final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState());
+		//First create all outgoing edges, so to be able to reason about the Dewey version
+		while (!states.isEmpty()) {
+			State<T> currentState = states.pop();
+			Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions();
 
 			// check all state transitions for each state
-			for (StateTransition<T> stateTransition: stateTransitions) {
+			for (StateTransition<T> stateTransition : stateTransitions) {
 				try {
-					if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) {
+					if (checkFilterCondition(stateTransition.getCondition(), event)) {
 						// filter condition is true
 						switch (stateTransition.getAction()) {
 							case PROCEED:
@@ -310,73 +519,8 @@ public class NFA<T> implements Serializable {
 								states.push(stateTransition.getTargetState());
 								break;
 							case IGNORE:
-								final DeweyNumber version;
-								if (branched) {
-									version = computationState.getVersion().increase();
-								} else {
-									version = computationState.getVersion();
-								}
-								resultingComputationStates.add(new ComputationState<T>(
-									computationState.getState(),
-									computationState.getEvent(),
-									computationState.getTimestamp(),
-									version,
-									computationState.getStartTimestamp()));
-
-								// we have a new computation state referring to the same the shared entry
-								// the lock of the current computation is released later on
-								sharedBuffer.lock(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
-								break;
 							case TAKE:
-								final State<T> newState = stateTransition.getTargetState();
-								final DeweyNumber oldVersion;
-								final DeweyNumber newComputationStateVersion;
-								final State<T> previousState = computationState.getState();
-								final T previousEvent = computationState.getEvent();
-								final long previousTimestamp;
-								final long startTimestamp;
-
-								if (computationState.isStartState()) {
-									oldVersion = new DeweyNumber(startEventCounter++);
-									newComputationStateVersion = oldVersion.addStage();
-									startTimestamp = timestamp;
-									previousTimestamp = -1L;
-
-								} else {
-									startTimestamp = computationState.getStartTimestamp();
-									previousTimestamp = computationState.getTimestamp();
-									oldVersion = computationState.getVersion();
-
-									branched = true;
-									newComputationStateVersion = oldVersion.addStage();
-								}
-
-								if (previousState.isStart()) {
-									sharedBuffer.put(
-										newState,
-										event,
-										timestamp,
-										oldVersion);
-								} else {
-									sharedBuffer.put(
-										newState,
-										event,
-										timestamp,
-										previousState,
-										previousEvent,
-										previousTimestamp,
-										oldVersion);
-								}
-
-								// a new computation state is referring to the shared entry
-								sharedBuffer.lock(newState, event, timestamp);
-
-								resultingComputationStates.add(new ComputationState<T>(
-									newState,
-									event,
-									timestamp,
-									newComputationStateVersion,
-									startTimestamp));
+								outgoingEdges.add(stateTransition);
 								break;
 						}
 					}
@@ -385,19 +529,12 @@ public class NFA<T> implements Serializable {
 				}
 			}
 		}
+		return outgoingEdges;
+	}
 
-		if (computationState.isStartState()) {
-			// a computation state is always kept if it refers to a starting state because every
-			// new element can start a new pattern
-			resultingComputationStates.add(computationState);
-		} else {
-			// release the shared entry referenced by the current computation state.
-			sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
-			// try to remove unnecessary shared buffer entries
-			sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
-		}
 
-		return resultingComputationStates;
+	private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception {
+		return condition == null || condition.filter(event);
 	}
 
 	/**
@@ -409,8 +546,8 @@ public class NFA<T> implements Serializable {
 	 * @return Collection of event sequences which end in the given computation state
 	 */
 	private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) {
-		Collection<LinkedHashMultimap<State<T>, T>> paths = sharedBuffer.extractPatterns(
-			computationState.getState(),
+		Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns(
+			computationState.getPreviousState().getName(),
 			computationState.getEvent(),
 			computationState.getTimestamp(),
 			computationState.getVersion());
@@ -420,19 +557,20 @@ public class NFA<T> implements Serializable {
 		TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();
 
 		// generate the correct names from the collection of LinkedHashMultimaps
-		for (LinkedHashMultimap<State<T>, T> path: paths) {
+		for (LinkedHashMultimap<String, T> path: paths) {
 			Map<String, T> resultPath = new HashMap<>();
-			for (State<T> key: path.keySet()) {
+			for (String key: path.keySet()) {
 				int counter = 0;
 				Set<T> events = path.get(key);
 
 				// we iterate over the elements in insertion order
 				for (T event: events) {
 					resultPath.put(
-						events.size() > 1 ? generateStateName(key.getName(), counter): key.getName(),
+						events.size() > 1 ? generateStateName(key, counter): key,
 						// copy the element so that the user can change it
 						serializer.isImmutableType() ? event : serializer.copy(event)
 					);
+					counter++;
 				}
 			}
 
@@ -472,6 +610,7 @@ public class NFA<T> implements Serializable {
 
 	private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
 		oos.writeObject(computationState.getState());
+		oos.writeObject(computationState.getPreviousState());
 		oos.writeLong(computationState.getTimestamp());
 		oos.writeObject(computationState.getVersion());
 		oos.writeLong(computationState.getStartTimestamp());
@@ -490,6 +629,7 @@ public class NFA<T> implements Serializable {
 	@SuppressWarnings("unchecked")
 	private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		final State<T> state = (State<T>)ois.readObject();
+		final State<T> previousState = (State<T>)ois.readObject();
 		final long timestamp = ois.readLong();
 		final DeweyNumber version = (DeweyNumber)ois.readObject();
 		final long startTimestamp = ois.readLong();
@@ -504,7 +644,7 @@ public class NFA<T> implements Serializable {
 			event = null;
 		}
 
-		return new ComputationState<>(state, event, timestamp, version, startTimestamp);
+		return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
 	}
 
 	/**
@@ -629,20 +769,4 @@ public class NFA<T> implements Serializable {
 			return getClass().hashCode();
 		}
 	}
-
-	/**
-	 * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state.
-	 */
-	private static final class StateTransitionComparator<T> implements Serializable, Comparator<StateTransition<T>> {
-
-		private static final long serialVersionUID = -2775474935413622278L;
-
-		@Override
-		public int compare(final StateTransition<T> o1, final StateTransition<T> o2) {
-			if (o1.getAction() == o2.getAction()) {
-				return 0;
-			}
-			return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index b7e288b..e6a8c75 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -212,28 +212,27 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		SharedBufferEntry<K, V> entry = get(key, value, timestamp);
 
 		if (entry != null) {
-			extractionStates.add(new ExtractionState<K, V>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
+			extractionStates.add(new ExtractionState<>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
 
 			// use a depth first search to reconstruct the previous relations
 			while (!extractionStates.isEmpty()) {
-				ExtractionState<K, V> extractionState = extractionStates.pop();
-				DeweyNumber currentVersion = extractionState.getVersion();
+				final ExtractionState<K, V> extractionState = extractionStates.pop();
 				// current path of the depth first search
-				Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+				final Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+				final SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
 
 				// termination criterion
-				if (currentVersion.length() == 1) {
-					LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
+				if (currentEntry == null) {
+					final LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
 
 					while(!currentPath.isEmpty()) {
-						SharedBufferEntry<K, V> currentEntry = currentPath.pop();
+						final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
 
-						completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
+						completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue());
 					}
 
 					result.add(completePath);
 				} else {
-					SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
 
 					// append state to the path
 					currentPath.push(currentEntry);
@@ -242,17 +241,18 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 					for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
 						// we can only proceed if the current version is compatible to the version
 						// of this previous relation
+						final DeweyNumber currentVersion = extractionState.getVersion();
 						if (currentVersion.isCompatibleWith(edge.getVersion())) {
 							if (firstMatch) {
 								// for the first match we don't have to copy the current path
-								extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), currentPath));
+								extractionStates.push(new ExtractionState<>(edge.getTarget(), edge.getVersion(), currentPath));
 								firstMatch = false;
 							} else {
-								Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
+								final Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
 								copy.addAll(currentPath);
 
 								extractionStates.push(
-									new ExtractionState<K, V>(
+									new ExtractionState<>(
 										edge.getTarget(),
 										edge.getVersion(),
 										copy));
@@ -260,6 +260,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 						}
 					}
 				}
+
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 50b2cf3..7bcb6ea 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,7 +45,7 @@ public class State<T> implements Serializable {
 		this.name = name;
 		this.stateType = stateType;
 
-		stateTransitions = new ArrayList<StateTransition<T>>();
+		stateTransitions = new ArrayList<>();
 	}
 
 	public boolean isFinal() {
@@ -60,8 +62,32 @@ public class State<T> implements Serializable {
 		return stateTransitions;
 	}
 
-	public void addStateTransition(final StateTransition<T> stateTransition) {
-		stateTransitions.add(stateTransition);
+
+	private void addStateTransition(
+		final StateTransitionAction action,
+		final State<T> targetState,
+		final FilterFunction<T> condition) {
+		stateTransitions.add(new StateTransition<T>(this, action, targetState, condition));
+	}
+
+	public void addIgnore(final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.IGNORE, this, condition);
+	}
+
+	public void addIgnore(final State<T> targetState,final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.IGNORE, targetState, condition);
+	}
+
+	public void addTake(final State<T> targetState, final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.TAKE, targetState, condition);
+	}
+
+	public void addProceed(final State<T> targetState, final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.PROCEED, targetState, condition);
+	}
+
+	public void addTake(final FilterFunction<T> condition) {
+		addStateTransition(StateTransitionAction.TAKE, this, condition);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index 479f28a..e3c7b7a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -27,12 +27,18 @@ public class StateTransition<T> implements Serializable {
 	private static final long serialVersionUID = -4825345749997891838L;
 
 	private final StateTransitionAction action;
+	private final State<T> sourceState;
 	private final State<T> targetState;
 	private final FilterFunction<T> condition;
 
-	public StateTransition(final StateTransitionAction action, final State<T> targetState, final FilterFunction<T> condition) {
+	public StateTransition(
+		final State<T> sourceState,
+		final StateTransitionAction action,
+		final State<T> targetState,
+		final FilterFunction<T> condition) {
 		this.action = action;
 		this.targetState = targetState;
+		this.sourceState = sourceState;
 		this.condition = condition;
 	}
 
@@ -44,6 +50,10 @@ public class StateTransition<T> implements Serializable {
 		return targetState;
 	}
 
+	public State<T> getSourceState() {
+		return sourceState;
+	}
+
 	public FilterFunction<T> getCondition() {
 		return condition;
 	}
@@ -55,6 +65,7 @@ public class StateTransition<T> implements Serializable {
 			StateTransition<T> other = (StateTransition<T>) obj;
 
 			return action == other.action &&
+				sourceState.getName().equals(other.sourceState.getName()) &&
 				targetState.getName().equals(other.targetState.getName());
 		} else {
 			return false;
@@ -64,14 +75,17 @@ public class StateTransition<T> implements Serializable {
 	@Override
 	public int hashCode() {
 		// we have to take the name of targetState because the transition might be reflexive
-		return Objects.hash(action, targetState.getName());
+		return Objects.hash(action, targetState.getName(), sourceState.getName());
 	}
 
 	@Override
 	public String toString() {
 		StringBuilder builder = new StringBuilder();
 
-		builder.append("StateTransition(").append(action).append(", ").append(targetState.getName());
+		builder.append("StateTransition(")
+			.append(action).append(", ")
+			.append(sourceState.getName()).append(", ")
+			.append(targetState.getName());
 
 		if (condition != null) {
 			builder.append(", with filter)");

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
index 70fc7fb..b8ca4e8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
@@ -22,7 +22,7 @@ package org.apache.flink.cep.nfa;
  * Set of actions when doing a state transition from a {@link State} to another.
  */
 public enum StateTransitionAction {
-	TAKE, // take the current event and assign it to the new state
-	IGNORE, // ignore the current event and do the state transition
+	TAKE, // take the current event and assign it to the current state
+	IGNORE, // ignore the current event
 	PROCEED // do the state transition and keep the current event for further processing (epsilon transition)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
deleted file mode 100644
index a3bb5f4..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa.compiler;
-
-/**
- * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
- * was not specified correctly.
- */
-public class MalformedPatternException extends RuntimeException {
-
-	private static final long serialVersionUID = 7751134834983361543L;
-
-	public MalformedPatternException(String message) {
-		super(message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 18ed21f..b476c49 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -22,18 +22,21 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.StateTransition;
-import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.cep.pattern.FollowedByPattern;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.NotFilterFunction;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -42,7 +45,7 @@ import java.util.Set;
  */
 public class NFACompiler {
 
-	protected final static String BEGINNING_STATE_NAME = "$beginningState$";
+	protected static final String ENDING_STATE_NAME = "$endState$";
 
 	/**
 	 * Compiles the given pattern into a {@link NFA}.
@@ -74,88 +77,288 @@ public class NFACompiler {
 	 */
 	@SuppressWarnings("unchecked")
 	public static <T> NFAFactory<T> compileFactory(
-		Pattern<T, ?> pattern,
-		TypeSerializer<T> inputTypeSerializer,
+		final Pattern<T, ?> pattern,
+		final TypeSerializer<T> inputTypeSerializer,
 		boolean timeoutHandling) {
 		if (pattern == null) {
 			// return a factory for empty NFAs
-			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
+			return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
 		} else {
-			// set of all generated states
-			Map<String, State<T>> states = new HashMap<>();
-			long windowTime;
+			final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
+			nfaFactoryCompiler.compileFactory();
+			return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
+		}
+	}
+
+	/**
+	 * Converts a {@link Pattern} into graph of {@link State}. It enables sharing of
+	 * compilation state across methods.
+	 *
+	 * @param <T>
+	 */
+	private static class NFAFactoryCompiler<T> {
+
+		private final Set<String> usedNames = new HashSet<>();
+		private final List<State<T>> states = new ArrayList<>();
 
-			// this is used to enforse pattern name uniqueness.
-			Set<String> patternNames = new HashSet<>();
+		private long windowTime = 0;
+		private Pattern<T, ?> currentPattern;
 
-			Pattern<T, ?> succeedingPattern;
-			State<T> succeedingState;
-			Pattern<T, ?> currentPattern = pattern;
+		NFAFactoryCompiler(final Pattern<T, ?> pattern) {
+			this.currentPattern = pattern;
+		}
 
+		/**
+		 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
+		 * multiple NFAs.
+		 */
+		void compileFactory() {
 			// we're traversing the pattern from the end to the beginning --> the first state is the final state
-			State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final);
-			patternNames.add(currentPattern.getName());
+			State<T> sinkState = createEndingState();
+			// add all the normal states
+			sinkState = createMiddleStates(sinkState);
+			// add the beginning state
+			createStartState(sinkState);
+		}
 
-			states.put(currentPattern.getName(), currentState);
+		List<State<T>> getStates() {
+			return states;
+		}
+
+		long getWindowTime() {
+			return windowTime;
+		}
+
+		/**
+		 * Creates the dummy Final {@link State} of the NFA graph.
+		 * @return dummy Final state
+		 */
+		private State<T> createEndingState() {
+			State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
+			states.add(endState);
+			usedNames.add(ENDING_STATE_NAME);
 
 			windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
+			return endState;
+		}
 
-			while (currentPattern.getPrevious() != null) {
-				succeedingPattern = currentPattern;
-				succeedingState = currentState;
-				currentPattern = currentPattern.getPrevious();
+		/**
+		 * Creates all the states between Start and Final state.
+		 * @param sinkState the state that last state should point to (always the Final state)
+		 * @return the next state after Start in the resulting graph
+		 */
+		private State<T> createMiddleStates(final State<T> sinkState) {
 
-				if (!patternNames.add(currentPattern.getName())) {
-					throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " +
-						"Pattern names must be unique.");
+			State<T> lastSink = sinkState;
+			while (currentPattern.getPrevious() != null) {
+				checkPatternNameUniqueness();
+
+				State<T> sourceState = new State<>(currentPattern.getName(), State.StateType.Normal);
+				states.add(sourceState);
+				usedNames.add(sourceState.getName());
+
+				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+					convertToLooping(sourceState, lastSink);
+
+					if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+						sourceState = createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal);
+						states.add(sourceState);
+						usedNames.add(sourceState.getName());
+					}
+				} else if (currentPattern.getQuantifier() == Quantifier.TIMES) {
+					sourceState = convertToTimesState(sourceState, lastSink, currentPattern.getTimes());
+				} else {
+					convertToSingletonState(sourceState, lastSink);
 				}
 
-				Time currentWindowTime = currentPattern.getWindowTime();
+				currentPattern = currentPattern.getPrevious();
+				lastSink = sourceState;
 
+				final Time currentWindowTime = currentPattern.getWindowTime();
 				if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
 					// the window time is the global minimum of all window times of each state
 					windowTime = currentWindowTime.toMilliseconds();
 				}
+			}
+
+			return lastSink;
+		}
+
+		private void checkPatternNameUniqueness() {
+			if (usedNames.contains(currentPattern.getName())) {
+				throw new MalformedPatternException(
+					"Duplicate pattern name: " + currentPattern.getName() + ". " +
+					"Pattern names must be unique.");
+			}
+		}
+
+		/**
+		 * Creates the Start {@link State} of the resulting NFA graph.
+		 * @param sinkState the state that Start state should point to (alwyas first state of middle states)
+		 * @return created state
+		 */
+		@SuppressWarnings("unchecked")
+		private State<T> createStartState(State<T> sinkState) {
+			checkPatternNameUniqueness();
+
+			final State<T> beginningState;
+			if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+				final State<T> loopingState;
+				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+					loopingState = new State<>(currentPattern.getName(), State.StateType.Normal);
+					beginningState = createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
+					states.add(loopingState);
+				} else {
+					loopingState = new State<>(currentPattern.getName(), State.StateType.Start);
+					beginningState = loopingState;
+				}
+				convertToLooping(loopingState, sinkState, true);
+			} else  {
+				if (currentPattern.getQuantifier() == Quantifier.TIMES && currentPattern.getTimes() > 1) {
+					final State<T> timesState = new State<>(currentPattern.getName(), State.StateType.Normal);
+					states.add(timesState);
+					sinkState = convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1);
+				}
 
-				if (states.containsKey(currentPattern.getName())) {
-					currentState = states.get(currentPattern.getName());
+				beginningState = new State<>(currentPattern.getName(), State.StateType.Start);
+				convertToSingletonState(beginningState, sinkState);
+			}
+
+			states.add(beginningState);
+			usedNames.add(beginningState.getName());
+
+			return beginningState;
+		}
+
+		/**
+		 * Converts the given state into a "complex" state consisting of given number of states with
+		 * same {@link FilterFunction}
+		 *
+		 * @param sourceState the state to be converted
+		 * @param sinkState the state that the converted state should point to
+		 * @param times number of times the state should be copied
+		 * @return the first state of the "complex" state, next state should point to it
+		 */
+		private State<T> convertToTimesState(final State<T> sourceState, final State<T> sinkState, int times) {
+			convertToSingletonState(sourceState, sinkState);
+			State<T> lastSink;
+			State<T> firstState = sourceState;
+			for (int i = 0; i < times - 1; i++) {
+				lastSink = firstState;
+				firstState = new State<>(currentPattern.getName(), State.StateType.Normal);
+				states.add(firstState);
+				convertToSingletonState(firstState, lastSink);
+			}
+			return firstState;
+		}
+
+		/**
+		 * Converts the given state into a simple single state. For an OPTIONAL state it also consists
+		 * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
+		 * in computation state graph  can be created only once.
+		 *
+		 * @param sourceState the state to be converted
+		 * @param sinkState state that the state being converted should point to
+		 */
+		@SuppressWarnings("unchecked")
+		private void convertToSingletonState(final State<T> sourceState, final State<T> sinkState) {
+
+			final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+			final FilterFunction<T> trueFunction = FilterFunctions.trueFunction();
+			sourceState.addTake(sinkState, currentFilterFunction);
+
+			if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
+				sourceState.addProceed(sinkState, trueFunction);
+			}
+
+			if (currentPattern instanceof FollowedByPattern) {
+				final State<T> ignoreState;
+				if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
+					ignoreState = new State<>(currentPattern.getName(), State.StateType.Normal);
+					ignoreState.addTake(sinkState, currentFilterFunction);
+					states.add(ignoreState);
 				} else {
-					currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
-					states.put(currentState.getName(), currentState);
+					ignoreState = sourceState;
 				}
+				sourceState.addIgnore(ignoreState, trueFunction);
+			}
+		}
 
-				currentState.addStateTransition(new StateTransition<T>(
-					StateTransitionAction.TAKE,
-					succeedingState,
-					(FilterFunction<T>) succeedingPattern.getFilterFunction()));
-
-				if (succeedingPattern instanceof FollowedByPattern) {
-					// the followed by pattern entails a reflexive ignore transition
-					currentState.addStateTransition(new StateTransition<T>(
-						StateTransitionAction.IGNORE,
-						currentState,
-						null
-					));
+		/**
+		 * Patterns with quantifiers AT_LEAST_ONE_* are converted into pair of states: a singleton state and
+		 * looping state. This method creates the first of the two.
+		 *
+		 * @param sinkState the state the newly created state should point to, it should be a looping state
+		 * @param stateType the type of the created state, as the NFA graph can also start wit AT_LEAST_ONE_*
+		 * @return the newly created state
+		 */
+		@SuppressWarnings("unchecked")
+		private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState, final State.StateType stateType) {
+
+			final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+			final State<T> firstState = new State<>(currentPattern.getName(), stateType);
+
+			firstState.addTake(sinkState, currentFilterFunction);
+			if (currentPattern instanceof FollowedByPattern) {
+				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+					firstState.addIgnore(new NotFilterFunction<>(currentFilterFunction));
+				} else {
+					firstState.addIgnore(FilterFunctions.<T>trueFunction());
 				}
 			}
+			return firstState;
+		}
 
-			// add the beginning state
-			final State<T> beginningState;
+		/**
+		 * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
+		 * for each PROCEED transition branches in computation state graph  can be created only once.
+		 *
+		 * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern}
+		 * to enable combinations.
+		 *
+		 * @param sourceState  the state to converted
+		 * @param sinkState    the state that the converted state should point to
+		 * @param isFirstState if the looping state is first of a graph
+		 */
+		@SuppressWarnings("unchecked")
+		private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) {
+
+			final FilterFunction<T> filterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+			final FilterFunction<T> trueFunction = FilterFunctions.<T>trueFunction();
+
+			sourceState.addProceed(sinkState, trueFunction);
+			sourceState.addTake(filterFunction);
+			if (currentPattern instanceof FollowedByPattern || isFirstState) {
+				final State<T> ignoreState = new State<>(
+					currentPattern.getName(),
+					State.StateType.Normal);
+
+
+				final FilterFunction<T> ignoreCondition;
+				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+					ignoreCondition = new NotFilterFunction<>(filterFunction);
+				} else {
+					ignoreCondition = trueFunction;
+				}
 
-			if (states.containsKey(BEGINNING_STATE_NAME)) {
-				beginningState = states.get(BEGINNING_STATE_NAME);
-			} else {
-				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
-				states.put(BEGINNING_STATE_NAME, beginningState);
+				sourceState.addIgnore(ignoreState, ignoreCondition);
+				ignoreState.addTake(sourceState, filterFunction);
+				ignoreState.addIgnore(ignoreState, ignoreCondition);
+				states.add(ignoreState);
 			}
+		}
 
-			beginningState.addStateTransition(new StateTransition<T>(
-				StateTransitionAction.TAKE,
-				currentState,
-				(FilterFunction<T>) currentPattern.getFilterFunction()
-			));
-
-			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
+		/**
+		 * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+		 * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
+		 * for each PROCEED transition branches in computation state graph  can be created only once.
+		 *
+		 * @param sourceState the state to converted
+		 * @param sinkState   the state that the converted state should point to
+		 */
+		private void convertToLooping(final State<T> sourceState, final State<T> sinkState) {
+			convertToLooping(sourceState, sinkState, false);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
new file mode 100644
index 0000000..12e58ba
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class FilterFunctions<T> {
+
+	private FilterFunctions() {
+	}
+
+	public static <T> FilterFunction<T> trueFunction()  {
+		return new FilterFunction<T>() {
+			@Override
+			public boolean filter(T value) throws Exception {
+				return true;
+			}
+		};
+	}
+
+	public static <T> FilterFunction<T> falseFunction()  {
+		return new FilterFunction<T>() {
+			@Override
+			public boolean filter(T value) throws Exception {
+				return false;
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
new file mode 100644
index 0000000..c85f3be
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+/**
+ * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
+ * was not specified correctly.
+ */
+public class MalformedPatternException extends RuntimeException {
+
+	private static final long serialVersionUID = 7751134834983361543L;
+
+	public MalformedPatternException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
new file mode 100644
index 0000000..a4fc8f5
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which negates filter function.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class NotFilterFunction<T> implements FilterFunction<T> {
+	private static final long serialVersionUID = -2109562093871155005L;
+
+	private final FilterFunction<T> original;
+
+	public NotFilterFunction(final FilterFunction<T> original) {
+		this.original = original;
+	}
+
+	@Override
+	public boolean filter(T value) throws Exception {
+		return !original.filter(value);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 7ea675f..7b4d9c7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Base class for a pattern definition.
@@ -53,6 +54,10 @@ public class Pattern<T, F extends T> {
 	// window length in which the pattern match has to occur
 	private Time windowTime;
 
+	private Quantifier quantifier = Quantifier.ONE;
+
+	private int times;
+
 	protected Pattern(final String name, final Pattern<T, ? extends T> previous) {
 		this.name = name;
 		this.previous = previous;
@@ -74,6 +79,14 @@ public class Pattern<T, F extends T> {
 		return windowTime;
 	}
 
+	public Quantifier getQuantifier() {
+		return quantifier;
+	}
+
+	public int getTimes() {
+		return times;
+	}
+
 	/**
 	 * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
 	 *
@@ -183,4 +196,106 @@ public class Pattern<T, F extends T> {
 		return new Pattern<X, X>(name, null);
 	}
 
+	/**
+	 * Specifies that this pattern can occur zero or more times(kleene star).
+	 * This means any number of events can be matched in this state.
+	 *
+	 * @return The same pattern with applied Kleene star operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> zeroOrMore() {
+		return zeroOrMore(true);
+	}
+
+	/**
+	 * Specifies that this pattern can occur zero or more times(kleene star).
+	 * This means any number of events can be matched in this state.
+	 *
+	 * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns:
+	 * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+	 *
+	 * @param eager if true the pattern always consumes earlier events
+	 * @return The same pattern with applied Kleene star operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> zeroOrMore(final boolean eager) {
+		checkIfQuantifierApplied();
+		if (eager) {
+			this.quantifier = Quantifier.ZERO_OR_MORE_EAGER;
+		} else {
+			this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS;
+		}
+		return this;
+	}
+
+	/**
+	 * Specifies that this pattern can occur one or more times(kleene star).
+	 * This means at least one and at most infinite number of events can be matched in this state.
+	 *
+	 * @return The same pattern with applied Kleene plus operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> oneOrMore() {
+		return oneOrMore(true);
+	}
+
+	/**
+	 * Specifies that this pattern can occur one or more times(kleene star).
+	 * This means at least one and at most infinite number of events can be matched in this state.
+	 *
+	 * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns:
+	 * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+	 *
+	 * @param eager if true the pattern always consumes earlier events
+	 * @return The same pattern with applied Kleene plus operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> oneOrMore(final boolean eager) {
+		checkIfQuantifierApplied();
+		if (eager) {
+			this.quantifier = Quantifier.ONE_OR_MORE_EAGER;
+		} else {
+			this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS;
+		}
+		return this;
+	}
+
+	/**
+	 * Specifies that this pattern can occur zero or once.
+	 *
+	 * @return The same pattern with applied Kleene ? operator
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> optional() {
+		checkIfQuantifierApplied();
+		this.quantifier = Quantifier.OPTIONAL;
+		return this;
+	}
+
+	/**
+	 * Specifies exact number of times that this pattern should be matched.
+	 *
+	 * @param times number of times matching event must appear
+	 * @return The same pattern with number of times applied
+	 *
+	 * @throws MalformedPatternException if quantifier already applied
+	 */
+	public Pattern<T, F> times(int times) {
+		checkIfQuantifierApplied();
+		Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
+		this.quantifier = Quantifier.TIMES;
+		this.times = times;
+		return this;
+	}
+
+	private void checkIfQuantifierApplied() {
+		if (this.quantifier != Quantifier.ONE) {
+			throw new MalformedPatternException("Already applied quantifier to this Pattern. Current quantifier is: " + this.quantifier);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
new file mode 100644
index 0000000..7abe9bd
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import java.util.EnumSet;
+
+public enum Quantifier {
+	ONE,
+	ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER),
+	ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
+	ONE_OR_MORE_EAGER(
+		QuantifierProperty.LOOPING,
+		QuantifierProperty.EAGER,
+		QuantifierProperty.AT_LEAST_ONE),
+	ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE),
+	TIMES,
+	OPTIONAL;
+
+	private final EnumSet<QuantifierProperty> properties;
+
+	Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) {
+		this.properties = EnumSet.of(first, rest);
+	}
+
+	Quantifier() {
+		this.properties = EnumSet.noneOf(QuantifierProperty.class);
+	}
+
+	public boolean hasProperty(QuantifierProperty property) {
+		return properties.contains(property);
+	}
+
+	public enum QuantifierProperty {
+		LOOPING,
+		EAGER,
+		AT_LEAST_ONE
+	}
+
+}