You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/05 13:56:05 UTC

[2/5] flink git commit: [FLINK-9593][cep] Unified After Match semantics with SQL MATCH_RECOGNIZE

[FLINK-9593][cep] Unified After Match semantics with SQL MATCH_RECOGNIZE

This closes #6171


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d934cb8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d934cb8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d934cb8f

Branch: refs/heads/master
Commit: d934cb8fba22855894e260e3e68533b599ac8e43
Parents: d02167d
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Thu Jun 14 17:10:05 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Thu Jul 5 15:54:54 2018 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  69 +++--
 .../flink/cep/scala/pattern/Pattern.scala       |   2 +-
 .../flink/cep/nfa/AfterMatchSkipStrategy.java   | 139 ---------
 .../apache/flink/cep/nfa/ComputationState.java  |  21 +-
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |   4 +
 .../apache/flink/cep/nfa/MigrationUtils.java    |  10 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 216 ++++++-------
 .../java/org/apache/flink/cep/nfa/NFAState.java |  48 ++-
 .../flink/cep/nfa/NFAStateSerializer.java       |  98 ++++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  16 +-
 .../nfa/aftermatch/AfterMatchSkipStrategy.java  | 155 ++++++++++
 .../cep/nfa/aftermatch/NoSkipStrategy.java      |  58 ++++
 .../nfa/aftermatch/SkipPastLastStrategy.java    |  65 ++++
 .../cep/nfa/aftermatch/SkipToFirstStrategy.java |  76 +++++
 .../cep/nfa/aftermatch/SkipToLastStrategy.java  |  76 +++++
 .../flink/cep/nfa/compiler/NFACompiler.java     |  10 +-
 .../flink/cep/nfa/sharedbuffer/EventId.java     |  11 +-
 .../cep/nfa/sharedbuffer/SharedBuffer.java      |  38 ++-
 .../AbstractKeyedCEPPatternOperator.java        |   2 +-
 .../flink/cep/operator/CEPOperatorUtils.java    |   2 +-
 .../cep/operator/FlatSelectCepOperator.java     |   2 +-
 .../operator/FlatSelectTimeoutCepOperator.java  |   2 +-
 .../flink/cep/operator/SelectCepOperator.java   |   2 +-
 .../cep/operator/SelectTimeoutCepOperator.java  |   2 +-
 .../apache/flink/cep/pattern/GroupPattern.java  |   2 +-
 .../org/apache/flink/cep/pattern/Pattern.java   |   2 +-
 .../java/org/apache/flink/cep/CEPITCase.java    |   2 +-
 .../flink/cep/nfa/AfterMatchSkipITCase.java     | 307 +++++++++++++++++--
 .../org/apache/flink/cep/nfa/GroupITCase.java   |   4 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     |  16 +-
 .../apache/flink/cep/nfa/NFATestUtilities.java  |   1 +
 .../apache/flink/cep/nfa/SameElementITCase.java |   8 +-
 .../flink/cep/nfa/UntilConditionITCase.java     |  40 +--
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   2 +-
 .../cep/nfa/sharedbuffer/SharedBufferTest.java  |  37 ++-
 35 files changed, 1139 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 921718a..6723e71 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1252,13 +1252,13 @@ pattern.within(Time.seconds(10))
 For a given pattern, the same event may be assigned to multiple successful matches. To control to how many matches an event will be assigned, you need to specify the skip strategy called `AfterMatchSkipStrategy`. There are four types of skip strategies, listed as follows:
 
 * <strong>*NO_SKIP*</strong>: Every possible match will be emitted.
-* <strong>*SKIP_PAST_LAST_EVENT*</strong>: Discards every partial match that contains event of the match.
-* <strong>*SKIP_TO_FIRST*</strong>: Discards every partial match that contains event of the match preceding the first of *PatternName*.
-* <strong>*SKIP_TO_LAST*</strong>: Discards every partial match that contains event of the match preceding the last of *PatternName*.
+* <strong>*SKIP_PAST_LAST_EVENT*</strong>: Discards every partial match that started after the match started but before it ended.
+* <strong>*SKIP_TO_FIRST*</strong>: Discards every partial match that started after the match started but before the first event of *PatternName* occurred.
+* <strong>*SKIP_TO_LAST*</strong>: Discards every partial match that started after the match started but before the last event of *PatternName* occurred.
 
 Notice that when using *SKIP_TO_FIRST* and *SKIP_TO_LAST* skip strategy, a valid *PatternName* should also be specified.
 
-For example, for a given pattern `a b{2}` and a data stream `ab1, ab2, ab3, ab4, ab5, ab6`, the differences between these four skip strategies are as follows:
+For example, for a given pattern `b+ c` and a data stream `b1 b2 b3 c`, the differences between these four skip strategies are as follows:
 
 <table class="table table-bordered">
     <tr>
@@ -1269,38 +1269,65 @@ For example, for a given pattern `a b{2}` and a data stream `ab1, ab2, ab3, ab4,
     <tr>
         <td><strong>NO_SKIP</strong></td>
         <td>
-            <code>ab1 ab2 ab3</code><br>
-            <code>ab2 ab3 ab4</code><br>
-            <code>ab3 ab4 ab5</code><br>
-            <code>ab4 ab5 ab6</code><br>
+            <code>b1 b2 b3 c</code><br>
+            <code>b2 b3 c</code><br>
+            <code>b3 c</code><br>
         </td>
-        <td>After found matching <code>ab1 ab2 ab3</code>, the match process will not discard any result.</td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will not discard any result.</td>
     </tr>
     <tr>
         <td><strong>SKIP_PAST_LAST_EVENT</strong></td>
         <td>
-            <code>ab1 ab2 ab3</code><br>
-            <code>ab4 ab5 ab6</code><br>
+            <code>b1 b2 b3 c</code><br>
         </td>
-        <td>After found matching <code>ab1 ab2 ab3</code>, the match process will discard all started partial matches.</td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will discard all started partial matches.</td>
     </tr>
     <tr>
-        <td><strong>SKIP_TO_FIRST</strong>[<code>b</code>]</td>
+        <td><strong>SKIP_TO_FIRST</strong>[<code>b*</code>]</td>
         <td>
-            <code>ab1 ab2 ab3</code><br>
-            <code>ab2 ab3 ab4</code><br>
-            <code>ab3 ab4 ab5</code><br>
-            <code>ab4 ab5 ab6</code><br>
+            <code>b1 b2 b3 c</code><br>
+            <code>b2 b3 c</code><br>
+            <code>b3 c</code><br>
         </td>
-        <td>After found matching <code>ab1 ab2 ab3</code>, the match process will discard all partial matches containing <code>ab1</code>, which is the only event that comes before the first <code>b</code>.</td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will try to discard all partial matches started before <code>b1</code>, but there are no such matches. Therefore nothing will be discarded.</td>
     </tr>
     <tr>
         <td><strong>SKIP_TO_LAST</strong>[<code>b</code>]</td>
         <td>
-            <code>ab1 ab2 ab3</code><br>
-            <code>ab3 ab4 ab5</code><br>
+            <code>b1 b2 b3 c</code><br>
+            <code>b3 c</code><br>
         </td>
-        <td>After found matching <code>ab1 ab2 ab3</code>, the match process will discard all partial matches containing <code>ab1</code> and <code>ab2</code>, which are events that comes before the last <code>b</code>.</td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will try to discard all partial matches started before <code>b3</code>. There is one such match <code>b2 b3 c</code></td>
+    </tr>
+</table>
+
+Have a look also at another example to better see the difference between NO_SKIP and SKIP_TO_FIRST:
+Pattern: `(a | c) (b | c) c+.greedy d` and sequence: `a b c1 c2 c3 d` Then the results will be:
+
+
+<table class="table table-bordered">
+    <tr>
+        <th class="text-left" style="width: 25%">Skip Strategy</th>
+        <th class="text-center" style="width: 25%">Result</th>
+        <th class="text-center"> Description</th>
+    </tr>
+    <tr>
+        <td><strong>NO_SKIP</strong></td>
+        <td>
+            <code>a b c1 c2 c3 d</code><br>
+            <code>b c1 c2 c3 d</code><br>
+            <code>c1 c2 c3 d</code><br>
+            <code>c2 c3 d</code><br>
+        </td>
+        <td>After found matching <code>a b c1 c2 c3 d</code>, the match process will not discard any result.</td>
+    </tr>
+    <tr>
+        <td><strong>SKIP_TO_FIRST</strong>[<code>b*</code>]</td>
+        <td>
+            <code>a b c1 c2 c3 d</code><br>
+            <code>c1 c2 c3 d</code><br>
+        </td>
+        <td>After found matching <code>a b c1 c2 c3 d</code>, the match process will try to discard all partial matches started before <code>c1</code>. There is one such match <code>b c1 c2 c3 d</code>.</td>
     </tr>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 42a95e8..c2d2788 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.cep.scala.pattern
 
 import org.apache.flink.cep
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
 import org.apache.flink.cep.pattern.conditions.IterativeCondition.{Context => JContext}
 import org.apache.flink.cep.pattern.conditions.{IterativeCondition, SimpleCondition}
 import org.apache.flink.cep.pattern.{MalformedPatternException, Quantifier, Pattern => JPattern}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
deleted file mode 100644
index dcda441..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa;
-
-import java.io.Serializable;
-
-
-/**
- * Indicate the skip strategy after a match process.
- *
- * <p>For more info on possible skip strategies see {@link SkipStrategy}.
- */
-public class AfterMatchSkipStrategy implements Serializable {
-
-	private static final long serialVersionUID = -4048930333619068531L;
-	// default strategy
-	private SkipStrategy strategy = SkipStrategy.NO_SKIP;
-
-	// pattern name to skip to
-	private String patternName = null;
-
-	/**
-	 * Discards every partial match that contains event of the match preceding the first of *PatternName*.
-	 * @param patternName the pattern name to skip to
-	 * @return the created AfterMatchSkipStrategy
-	 */
-	public static AfterMatchSkipStrategy skipToFirst(String patternName) {
-		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_FIRST, patternName);
-	}
-
-	/**
-	 * Discards every partial match that contains event of the match preceding the last of *PatternName*.
-	 * @param patternName the pattern name to skip to
-	 * @return the created AfterMatchSkipStrategy
-	 */
-	public static AfterMatchSkipStrategy skipToLast(String patternName) {
-		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_LAST, patternName);
-	}
-
-	/**
-	 * Discards every partial match that contains event of the match.
-	 * @return the created AfterMatchSkipStrategy
-	 */
-	public static AfterMatchSkipStrategy skipPastLastEvent() {
-		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_PAST_LAST_EVENT);
-	}
-
-	/**
-	 * Every possible match will be emitted.
-	 * @return the created AfterMatchSkipStrategy
-	 */
-	public static AfterMatchSkipStrategy noSkip() {
-		return new AfterMatchSkipStrategy(SkipStrategy.NO_SKIP);
-	}
-
-	private AfterMatchSkipStrategy(SkipStrategy strategy) {
-		this(strategy, null);
-	}
-
-	private AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) {
-		if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
-			throw new IllegalArgumentException("The patternName field can not be empty when SkipStrategy is " + strategy);
-		}
-		this.strategy = strategy;
-		this.patternName = patternName;
-	}
-
-	/**
-	 * Get the {@link SkipStrategy} enum.
-	 * @return the skip strategy
-	 */
-	public SkipStrategy getStrategy() {
-		return strategy;
-	}
-
-	/**
-	 * Get the referenced pattern name of this strategy.
-	 * @return the referenced pattern name.
-	 */
-	public String getPatternName() {
-		return patternName;
-	}
-
-	@Override
-	public String toString() {
-		switch (strategy) {
-			case NO_SKIP:
-			case SKIP_PAST_LAST_EVENT:
-				return "AfterMatchStrategy{" +
-					strategy +
-					"}";
-			case SKIP_TO_FIRST:
-			case SKIP_TO_LAST:
-				return "AfterMatchStrategy{" +
-					strategy + "[" +
-					patternName + "]" +
-					"}";
-		}
-		return super.toString();
-	}
-
-	/**
-	 * Skip Strategy Enum.
-	 */
-	public enum SkipStrategy{
-		/**
-		 * Every possible match will be emitted.
-		 */
-		NO_SKIP,
-		/**
-		 * Discards every partial match that contains event of the match.
-		 */
-		SKIP_PAST_LAST_EVENT,
-		/**
-		 * Discards every partial match that contains event of the match preceding the first of *PatternName*.
-		 */
-		SKIP_TO_FIRST,
-		/**
-		 * Discards every partial match that contains event of the match preceding the last of *PatternName*.
-		 */
-		SKIP_TO_LAST
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 65715a7..2edc7ee 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 
 import javax.annotation.Nullable;
@@ -42,15 +43,24 @@ public class ComputationState {
 	@Nullable
 	private final NodeId previousBufferEntry;
 
+	@Nullable
+	private final EventId startEventID;
+
 	private ComputationState(
 			final String currentState,
 			@Nullable final NodeId previousBufferEntry,
 			final DeweyNumber version,
+			@Nullable final EventId startEventID,
 			final long startTimestamp) {
 		this.currentStateName = currentState;
 		this.version = version;
 		this.startTimestamp = startTimestamp;
 		this.previousBufferEntry = previousBufferEntry;
+		this.startEventID = startEventID;
+	}
+
+	public EventId getStartEventID() {
+		return startEventID;
 	}
 
 	public NodeId getPreviousBufferEntry() {
@@ -76,6 +86,7 @@ public class ComputationState {
 			return Objects.equals(currentStateName, other.currentStateName) &&
 				Objects.equals(version, other.version) &&
 				startTimestamp == other.startTimestamp &&
+				Objects.equals(startEventID, other.startEventID) &&
 				Objects.equals(previousBufferEntry, other.previousBufferEntry);
 		} else {
 			return false;
@@ -89,12 +100,13 @@ public class ComputationState {
 			", version=" + version +
 			", startTimestamp=" + startTimestamp +
 			", previousBufferEntry=" + previousBufferEntry +
+			", startEventID=" + startEventID +
 			'}';
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(currentStateName, version, startTimestamp, previousBufferEntry);
+		return Objects.hash(currentStateName, version, startTimestamp, startEventID, previousBufferEntry);
 	}
 
 	public static ComputationState createStartState(final String state) {
@@ -102,14 +114,15 @@ public class ComputationState {
 	}
 
 	public static ComputationState createStartState(final String state, final DeweyNumber version) {
-		return createState(state, null, version, -1L);
+		return createState(state, null, version, -1L, null);
 	}
 
 	public static ComputationState createState(
 			final String currentState,
 			final NodeId previousEntry,
 			final DeweyNumber version,
-			final long startTimestamp) {
-		return new ComputationState(currentState, previousEntry, version, startTimestamp);
+			final long startTimestamp,
+			final EventId startEventID) {
+		return new ComputationState(currentState, previousEntry, version, startEventID, startTimestamp);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 34897fa..68e0eec 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -90,6 +90,10 @@ public class DeweyNumber implements Serializable {
 		}
 	}
 
+	public int getRun() {
+		return deweyNumber[0];
+	}
+
 	public int length() {
 		return deweyNumber.length;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
index f1656a1..86d7fac 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +41,7 @@ class MigrationUtils {
 	/**
 	 * Skips bytes corresponding to serialized states. In flink 1.6+ the states are no longer kept in state.
 	 */
-	static <T> void skipSerializedStates(DataInputView in) throws IOException {
+	static void skipSerializedStates(DataInputView in) throws IOException {
 		TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE;
 		TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
 		TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
@@ -73,7 +74,7 @@ class MigrationUtils {
 		}
 	}
 
-	private static <T> void skipCondition(DataInputView in) throws IOException, ClassNotFoundException {
+	private static void skipCondition(DataInputView in) throws IOException, ClassNotFoundException {
 		boolean hasCondition = in.readBoolean();
 		if (hasCondition) {
 			int length = in.readInt();
@@ -115,13 +116,16 @@ class MigrationUtils {
 			}
 
 			NodeId nodeId;
+			EventId startEventId;
 			if (prevState != null) {
 				nodeId = sharedBuffer.getNodeId(prevState, timestamp, counter, event);
+				startEventId = sharedBuffer.getStartEventId(version.getRun());
 			} else {
 				nodeId = null;
+				startEventId = null;
 			}
 
-			computationStates.add(ComputationState.createState(state, nodeId, version, startTimestamp));
+			computationStates.add(ComputationState.createState(state, nodeId, version, startTimestamp, startEventId));
 		}
 		return computationStates;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 276cde7..815b25a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
@@ -45,14 +46,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Queue;
-import java.util.Set;
 import java.util.Stack;
 
 import static org.apache.flink.cep.nfa.MigrationUtils.deserializeComputationStates;
@@ -237,18 +236,18 @@ public class NFA<T> {
 			final NFAState nfaState,
 			final long timestamp) throws Exception {
 
-		Queue<ComputationState> computationStates = nfaState.getComputationStates();
 		final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
+		final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
-		final int numberComputationStates = computationStates.size();
-		for (int i = 0; i < numberComputationStates; i++) {
-			ComputationState computationState = computationStates.poll();
-
+		Map<EventId, T> eventsCache = new HashMap<>();
+		for (ComputationState computationState : nfaState.getPartialMatches()) {
 			if (isStateTimedOut(computationState, timestamp)) {
 
 				if (handleTimeout) {
 					// extract the timed out event pattern
-					Map<String, List<T>> timedOutPattern = extractCurrentMatches(sharedBuffer, computationState);
+					Map<String, List<T>> timedOutPattern = sharedBuffer.materializeMatch(extractCurrentMatches(
+						sharedBuffer,
+						computationState), eventsCache);
 					timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
 				}
 
@@ -256,10 +255,12 @@ public class NFA<T> {
 
 				nfaState.setStateChanged();
 			} else {
-				computationStates.add(computationState);
+				newPartialMatches.add(computationState);
 			}
 		}
 
+		nfaState.setNewPartialMatches(newPartialMatches);
+
 		sharedBuffer.advanceTime(timestamp);
 
 		return timeoutResult;
@@ -275,15 +276,11 @@ public class NFA<T> {
 			final EventWrapper event,
 			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
 
-		Queue<ComputationState> computationStates = nfaState.getComputationStates();
-
-		final int numberComputationStates = computationStates.size();
-		final Collection<Map<String, List<T>>> result = new ArrayList<>();
+		final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
+		final PriorityQueue<ComputationState> potentialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
 		// iterate over all current computations
-		for (int i = 0; i < numberComputationStates; i++) {
-			ComputationState computationState = computationStates.poll();
-
+		for (ComputationState computationState : nfaState.getPartialMatches()) {
 			final Collection<ComputationState> newComputationStates = computeNextStates(
 				sharedBuffer,
 				computationState,
@@ -303,12 +300,7 @@ public class NFA<T> {
 			for (final ComputationState newComputationState : newComputationStates) {
 
 				if (isFinalState(newComputationState)) {
-					// we've reached a final state and can thus retrieve the matching event sequence
-					Map<String, List<T>> matchedPattern = extractCurrentMatches(sharedBuffer, newComputationState);
-					result.add(matchedPattern);
-
-					// remove found patterns because they are no longer needed
-					sharedBuffer.releaseNode(newComputationState.getPreviousBufferEntry());
+					potentialMatches.add(newComputationState);
 				} else if (isStopState(newComputationState)) {
 					//reached stop state. release entry for the stop state
 					shouldDiscardPath = true;
@@ -326,81 +318,89 @@ public class NFA<T> {
 					sharedBuffer.releaseNode(state.getPreviousBufferEntry());
 				}
 			} else {
-				computationStates.addAll(statesToRetain);
+				newPartialMatches.addAll(statesToRetain);
 			}
 		}
 
-		discardComputationStatesAccordingToStrategy(
-			sharedBuffer, computationStates, result, afterMatchSkipStrategy);
+		if (!potentialMatches.isEmpty()) {
+			nfaState.setStateChanged();
+		}
+
+		List<Map<String, List<T>>> result = new ArrayList<>();
+		if (afterMatchSkipStrategy.isSkipStrategy()) {
+			processMatchesAccordingToSkipStrategy(sharedBuffer,
+				nfaState,
+				afterMatchSkipStrategy,
+				potentialMatches,
+				newPartialMatches,
+				result);
+		} else {
+			for (ComputationState match : potentialMatches) {
+				Map<EventId, T> eventsCache = new HashMap<>();
+				Map<String, List<T>> materializedMatch =
+					sharedBuffer.materializeMatch(
+						sharedBuffer.extractPatterns(
+							match.getPreviousBufferEntry(),
+							match.getVersion()).get(0),
+						eventsCache
+					);
+
+				result.add(materializedMatch);
+				sharedBuffer.releaseNode(match.getPreviousBufferEntry());
+			}
+		}
+
+		nfaState.setNewPartialMatches(newPartialMatches);
 
 		return result;
 	}
 
-	private void discardComputationStatesAccordingToStrategy(
-			final SharedBuffer<T> sharedBuffer,
-			final Queue<ComputationState> computationStates,
-			final Collection<Map<String, List<T>>> matchedResult,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+	private void processMatchesAccordingToSkipStrategy(
+			SharedBuffer<T> sharedBuffer,
+			NFAState nfaState,
+			AfterMatchSkipStrategy afterMatchSkipStrategy,
+			PriorityQueue<ComputationState> potentialMatches,
+			PriorityQueue<ComputationState> partialMatches,
+			List<Map<String, List<T>>> result) throws Exception {
 
-		Set<T> discardEvents = new HashSet<>();
-		switch(afterMatchSkipStrategy.getStrategy()) {
-			case SKIP_TO_LAST:
-				for (Map<String, List<T>> resultMap: matchedResult) {
-					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
-						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-							discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1));
-							break;
-						} else {
-							discardEvents.addAll(keyMatches.getValue());
-						}
-					}
-				}
-				break;
-			case SKIP_TO_FIRST:
-				for (Map<String, List<T>> resultMap: matchedResult) {
-					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
-						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-							break;
-						} else {
-							discardEvents.addAll(keyMatches.getValue());
-						}
-					}
-				}
-				break;
-			case SKIP_PAST_LAST_EVENT:
-				for (Map<String, List<T>> resultMap: matchedResult) {
-					for (List<T> eventList: resultMap.values()) {
-						discardEvents.addAll(eventList);
-					}
-				}
-				break;
-		}
-		if (!discardEvents.isEmpty()) {
-			List<ComputationState> discardStates = new ArrayList<>();
-			for (ComputationState computationState : computationStates) {
-				boolean discard = false;
-				Map<String, List<T>> partialMatch = extractCurrentMatches(sharedBuffer, computationState);
-				for (List<T> list: partialMatch.values()) {
-					for (T e: list) {
-						if (discardEvents.contains(e)) {
-							// discard the computation state.
-							discard = true;
-							break;
-						}
-					}
-					if (discard) {
-						break;
-					}
-				}
-				if (discard) {
-					sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
-					discardStates.add(computationState);
-				}
+		nfaState.getCompletedMatches().addAll(potentialMatches);
+
+		ComputationState earliestMatch = nfaState.getCompletedMatches().peek();
+
+		if (earliestMatch != null) {
+
+			Map<EventId, T> eventsCache = new HashMap<>();
+			ComputationState earliestPartialMatch;
+			while (earliestMatch != null && ((earliestPartialMatch = partialMatches.peek()) == null ||
+				isEarlier(earliestMatch, earliestPartialMatch))) {
+
+				nfaState.setStateChanged();
+				nfaState.getCompletedMatches().poll();
+				List<Map<String, List<EventId>>> matchedResult =
+					sharedBuffer.extractPatterns(earliestMatch.getPreviousBufferEntry(), earliestMatch.getVersion());
+
+				afterMatchSkipStrategy.prune(
+					partialMatches,
+					matchedResult,
+					sharedBuffer);
+
+				afterMatchSkipStrategy.prune(
+					nfaState.getCompletedMatches(),
+					matchedResult,
+					sharedBuffer);
+
+				result.add(sharedBuffer.materializeMatch(matchedResult.get(0), eventsCache));
+				earliestMatch = nfaState.getCompletedMatches().peek();
 			}
-			computationStates.removeAll(discardStates);
+
+			nfaState.getPartialMatches().removeIf(pm -> pm.getStartEventID() != null && !partialMatches.contains(pm));
 		}
 	}
 
+	private boolean isEarlier(ComputationState earliestMatch, ComputationState earliestPartialMatch) {
+		return NFAState.COMPUTATION_STATE_COMPARATOR.compare(earliestMatch, earliestPartialMatch) <= 0;
+	}
+
 	private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
 		return s1.getName().equals(s2.getName());
 	}
@@ -569,12 +569,13 @@ public class NFA<T> {
 						}
 
 						addComputationState(
-								sharedBuffer,
-								resultingComputationStates,
-								edge.getTargetState(),
-								computationState.getPreviousBufferEntry(),
-								version,
-								computationState.getStartTimestamp()
+							sharedBuffer,
+							resultingComputationStates,
+							edge.getTargetState(),
+							computationState.getPreviousBufferEntry(),
+							version,
+							computationState.getStartTimestamp(),
+							computationState.getStartEventID()
 						);
 					}
 				}
@@ -596,10 +597,13 @@ public class NFA<T> {
 						currentVersion);
 
 					final long startTimestamp;
+					final EventId startEventId;
 					if (isStartState(computationState)) {
 						startTimestamp = timestamp;
+						startEventId = event.getEventId();
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
+						startEventId = computationState.getStartEventID();
 					}
 
 					addComputationState(
@@ -608,7 +612,8 @@ public class NFA<T> {
 							nextState,
 							newEntry,
 							nextVersion,
-							startTimestamp);
+							startTimestamp,
+							startEventId);
 
 					//check if newly created state is optional (have a PROCEED path to Final state)
 					final State<T> finalState = findFinalStateAfterProceed(context, nextState, event.getEvent());
@@ -619,7 +624,8 @@ public class NFA<T> {
 								finalState,
 								newEntry,
 								nextVersion,
-								startTimestamp);
+								startTimestamp,
+								startEventId);
 					}
 					break;
 			}
@@ -649,9 +655,10 @@ public class NFA<T> {
 			State<T> currentState,
 			NodeId previousEntry,
 			DeweyNumber version,
-			long startTimestamp) throws Exception {
+			long startTimestamp,
+			EventId startEventId) throws Exception {
 		ComputationState computationState = ComputationState.createState(
-				currentState.getName(), previousEntry, version, startTimestamp);
+				currentState.getName(), previousEntry, version, startTimestamp, startEventId);
 		computationStates.add(computationState);
 
 		sharedBuffer.lockNode(previousEntry);
@@ -745,14 +752,14 @@ public class NFA<T> {
 	 * @return Collection of event sequences which end in the given computation state
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	private Map<String, List<T>> extractCurrentMatches(
+	private Map<String, List<EventId>> extractCurrentMatches(
 			final SharedBuffer<T> sharedBuffer,
 			final ComputationState computationState) throws Exception {
 		if (computationState.getPreviousBufferEntry() == null) {
 			return new HashMap<>();
 		}
 
-		List<Map<String, List<T>>> paths = sharedBuffer.extractPatterns(
+		List<Map<String, List<EventId>>> paths = sharedBuffer.extractPatterns(
 				computationState.getPreviousBufferEntry(),
 				computationState.getVersion());
 
@@ -762,15 +769,7 @@ public class NFA<T> {
 		// for a given computation state, we cannot have more than one matching patterns.
 		Preconditions.checkState(paths.size() == 1);
 
-		Map<String, List<T>> result = new LinkedHashMap<>();
-		Map<String, List<T>> path = paths.get(0);
-		for (String key: path.keySet()) {
-			List<T> events = path.get(key);
-
-			List<T> values = result.computeIfAbsent(key, k -> new ArrayList<>(events.size()));
-			values.addAll(events);
-		}
-		return result;
+		return paths.get(0);
 	}
 
 	/**
@@ -809,7 +808,8 @@ public class NFA<T> {
 			// this is to avoid any overheads when using a simple, non-iterative condition.
 
 			if (matchedEvents == null) {
-				this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, computationState);
+				this.matchedEvents = sharedBuffer.materializeMatch(nfa.extractCurrentMatches(sharedBuffer,
+					computationState));
 			}
 
 			return new Iterable<T>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
index a89c72e..dae7013 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.cep.nfa;
 
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Objects;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
 /**
@@ -31,16 +34,33 @@ public class NFAState {
 	 * These are the "active" intermediate states that are waiting for new matching
 	 * events to transition to new valid states.
 	 */
-	private final Queue<ComputationState> computationStates;
+	private Queue<ComputationState> partialMatches;
+
+	private Queue<ComputationState> completedMatches;
 
 	/**
 	 * Flag indicating whether the matching status of the state machine has changed.
 	 */
 	private boolean stateChanged;
 
-	public NFAState(Queue<ComputationState> computationStates) {
-		this.computationStates = computationStates;
-		this.stateChanged = false;
+	public static final Comparator<ComputationState> COMPUTATION_STATE_COMPARATOR =
+		Comparator.<ComputationState>comparingLong(c ->
+				c.getStartEventID() != null ? c.getStartEventID().getTimestamp() : Long.MAX_VALUE)
+			.thenComparingInt(c ->
+				c.getStartEventID() != null ? c.getStartEventID().getId() : Integer.MAX_VALUE);
+
+	public NFAState(Iterable<ComputationState> states) {
+		this.partialMatches = new PriorityQueue<>(COMPUTATION_STATE_COMPARATOR);
+		for (ComputationState startingState : states) {
+			partialMatches.add(startingState);
+		}
+
+		this.completedMatches = new PriorityQueue<>(COMPUTATION_STATE_COMPARATOR);
+	}
+
+	public NFAState(Queue<ComputationState> partialMatches, Queue<ComputationState> completedMatches) {
+		this.partialMatches = partialMatches;
+		this.completedMatches = completedMatches;
 	}
 
 	/**
@@ -66,8 +86,16 @@ public class NFAState {
 		this.stateChanged = true;
 	}
 
-	public Queue<ComputationState> getComputationStates() {
-		return computationStates;
+	public Queue<ComputationState> getPartialMatches() {
+		return partialMatches;
+	}
+
+	public Queue<ComputationState> getCompletedMatches() {
+		return completedMatches;
+	}
+
+	public void setNewPartialMatches(PriorityQueue<ComputationState> newPartialMatches) {
+		this.partialMatches = newPartialMatches;
 	}
 
 	@Override
@@ -79,18 +107,20 @@ public class NFAState {
 			return false;
 		}
 		NFAState nfaState = (NFAState) o;
-		return 	Objects.equals(computationStates, nfaState.computationStates);
+		return Arrays.equals(partialMatches.toArray(), nfaState.partialMatches.toArray()) &&
+			Arrays.equals(completedMatches.toArray(), nfaState.completedMatches.toArray());
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(computationStates, stateChanged);
+		return Objects.hash(partialMatches, completedMatches);
 	}
 
 	@Override
 	public String toString() {
 		return "NFAState{" +
-			"computationStates=" + computationStates +
+			"partialMatches=" + partialMatches +
+			", completedMatches=" + completedMatches +
 			", stateChanged=" + stateChanged +
 			'}';
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index bac144d..05b6c91 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -31,7 +32,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
 /**
@@ -89,43 +90,61 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		return -1;
 	}
 
+	private static final StringSerializer STATE_NAME_SERIALIZER = StringSerializer.INSTANCE;
+	private static final LongSerializer TIMESTAMP_SERIALIZER = LongSerializer.INSTANCE;
+	private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
+	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = NodeId.NodeIdSerializer.INSTANCE;
+	private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;
+
 	@Override
 	public void serialize(NFAState record, DataOutputView target) throws IOException {
+		serializeComputationStates(record.getPartialMatches(), target);
+		serializeComputationStates(record.getCompletedMatches(), target);
+	}
 
-		target.writeInt(record.getComputationStates().size());
-
-		StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
-		LongSerializer timestampSerializer = LongSerializer.INSTANCE;
-		DeweyNumber.DeweyNumberSerializer versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
-		NodeId.NodeIdSerializer nodeIdSerializer = NodeId.NodeIdSerializer.INSTANCE;
-
-		for (ComputationState computationState : record.getComputationStates()) {
-			stateNameSerializer.serialize(computationState.getCurrentStateName(), target);
-			nodeIdSerializer.serialize(computationState.getPreviousBufferEntry(), target);
-
-			versionSerializer.serialize(computationState.getVersion(), target);
-			timestampSerializer.serialize(computationState.getStartTimestamp(), target);
+	private void serializeComputationStates(Queue<ComputationState> states, DataOutputView target) throws IOException {
+		target.writeInt(states.size());
+		for (ComputationState computationState : states) {
+			STATE_NAME_SERIALIZER.serialize(computationState.getCurrentStateName(), target);
+			NODE_ID_SERIALIZER.serialize(computationState.getPreviousBufferEntry(), target);
+
+			VERSION_SERIALIZER.serialize(computationState.getVersion(), target);
+			TIMESTAMP_SERIALIZER.serialize(computationState.getStartTimestamp(), target);
+			if (computationState.getStartEventID() != null) {
+				target.writeByte(1);
+				EVENT_ID_SERIALIZER.serialize(computationState.getStartEventID(), target);
+			} else {
+				target.writeByte(0);
+			}
 		}
 	}
 
 	@Override
 	public NFAState deserialize(DataInputView source) throws IOException {
-		Queue<ComputationState> computationStates = new LinkedList<>();
-		StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
-		LongSerializer timestampSerializer = LongSerializer.INSTANCE;
-		DeweyNumber.DeweyNumberSerializer versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
-		NodeId.NodeIdSerializer nodeIdSerializer = NodeId.NodeIdSerializer.INSTANCE;
+		PriorityQueue<ComputationState> partialMatches = deserializeComputationStates(source);
+		PriorityQueue<ComputationState> completedMatches = deserializeComputationStates(source);
+		return new NFAState(partialMatches, completedMatches);
+	}
+
+	private PriorityQueue<ComputationState> deserializeComputationStates(DataInputView source) throws IOException {
+		PriorityQueue<ComputationState> computationStates = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
 		int computationStateNo = source.readInt();
 		for (int i = 0; i < computationStateNo; i++) {
-			String state = stateNameSerializer.deserialize(source);
-			NodeId prevState = nodeIdSerializer.deserialize(source);
-			DeweyNumber version = versionSerializer.deserialize(source);
-			long startTimestamp = timestampSerializer.deserialize(source);
-
-			computationStates.add(ComputationState.createState(state, prevState, version, startTimestamp));
+			String state = STATE_NAME_SERIALIZER.deserialize(source);
+			NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
+			DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
+			long startTimestamp = TIMESTAMP_SERIALIZER.deserialize(source);
+
+			byte isNull = source.readByte();
+			EventId startEventId = null;
+			if (isNull == 1) {
+				startEventId = EVENT_ID_SERIALIZER.deserialize(source);
+			}
+
+			computationStates.add(ComputationState.createState(state, prevState, version, startTimestamp, startEventId));
 		}
-		return new NFAState(computationStates);
+		return computationStates;
 	}
 
 	@Override
@@ -135,7 +154,32 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		serialize(deserialize(source), target);
+		copyStates(source, target); // copy partial matches
+		copyStates(source, target); // copy completed matches
+	}
+
+	private void copyStates(DataInputView source, DataOutputView target) throws IOException {
+		int computationStateNo = source.readInt();
+		target.writeInt(computationStateNo);
+
+		for (int i = 0; i < computationStateNo; i++) {
+			String state = STATE_NAME_SERIALIZER.deserialize(source);
+			STATE_NAME_SERIALIZER.serialize(state, target);
+			NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
+			NODE_ID_SERIALIZER.serialize(prevState, target);
+			DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
+			VERSION_SERIALIZER.serialize(version, target);
+			long startTimestamp = TIMESTAMP_SERIALIZER.deserialize(source);
+			TIMESTAMP_SERIALIZER.serialize(startTimestamp, target);
+
+			byte isNull = source.readByte();
+			target.writeByte(isNull);
+
+			if (isNull == 1) {
+				EventId startEventId = EVENT_ID_SERIALIZER.deserialize(source);
+				EVENT_ID_SERIALIZER.serialize(startEventId, target);
+			}
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index a4dbc00..6e5f9db 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -50,6 +50,8 @@ import java.util.stream.Collectors;
 public class SharedBuffer<V> {
 
 	private final Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext;
+	/** Run number (first block in DeweyNumber) -> EventId. */
+	private Map<Integer, EventId> starters;
 	private final Map<EventId, Lockable<V>> eventsBuffer;
 	private final Map<NodeId, Lockable<SharedBufferNode>> pages;
 
@@ -64,11 +66,13 @@ public class SharedBuffer<V> {
 	public SharedBuffer(
 			Map<EventId, Lockable<V>> eventsBuffer,
 			Map<NodeId, Lockable<SharedBufferNode>> pages,
-			Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext) {
+			Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext,
+			Map<Integer, EventId> starters) {
 
 		this.eventsBuffer = eventsBuffer;
 		this.pages = pages;
 		this.mappingContext = mappingContext;
+		this.starters = starters;
 	}
 
 	public NodeId getNodeId(String prevState, long timestamp, int counter, V event) {
@@ -76,6 +80,10 @@ public class SharedBuffer<V> {
 			new ValueTimeWrapper<>(event, timestamp, counter)));
 	}
 
+	public EventId getStartEventId(int run) {
+		return starters.get(run);
+	}
+
 	/**
 	 * Wrapper for a value-timestamp pair.
 	 *
@@ -284,6 +292,7 @@ public class SharedBuffer<V> {
 			// read the edges of the shared buffer entries
 			int totalEdges = source.readInt();
 
+			Map<Integer, EventId> starters = new HashMap<>();
 			for (int j = 0; j < totalEdges; j++) {
 				int sourceIdx = source.readInt();
 
@@ -297,11 +306,14 @@ public class SharedBuffer<V> {
 				Tuple2<NodeId, Lockable<SharedBufferNode>> targetEntry =
 					targetIdx < 0 ? Tuple2.of(null, null) : entries.get(targetIdx);
 				sourceEntry.f1.getElement().addEdge(new SharedBufferEdge(targetEntry.f0, version));
+				if (version.length() == 1) {
+					starters.put(version.getRun(), sourceEntry.f0.getEventId());
+				}
 			}
 
 			Map<NodeId, Lockable<SharedBufferNode>> entriesMap = entries.stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1));
 
-			return new SharedBuffer<>(valuesWithIds, entriesMap, mappingContext);
+			return new SharedBuffer<>(valuesWithIds, entriesMap, mappingContext, starters);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
new file mode 100644
index 0000000..e0f399f
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.ComputationState;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ */
+public abstract class AfterMatchSkipStrategy implements Serializable {
+
+	private static final long serialVersionUID = -4048930333619068531L;
+
+	/**
+	 * Discards every partial match that contains event of the match preceding the first of *PatternName*.
+	 *
+	 * @param patternName the pattern name to skip to
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy skipToFirst(String patternName) {
+		return new SkipToFirstStrategy(patternName);
+	}
+
+	/**
+	 * Discards every partial match that contains event of the match preceding the last of *PatternName*.
+	 *
+	 * @param patternName the pattern name to skip to
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy skipToLast(String patternName) {
+		return new SkipToLastStrategy(patternName);
+	}
+
+	/**
+	 * Discards every partial match that contains event of the match.
+	 *
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy skipPastLastEvent() {
+		return SkipPastLastStrategy.INSTANCE;
+	}
+
+	/**
+	 * Every possible match will be emitted.
+	 *
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy noSkip() {
+		return NoSkipStrategy.INSTANCE;
+	}
+
+	/**
+	 * Tells if the strategy may skip some matches.
+	 *
+	 * @return false if the strategy is NO_SKIP strategy
+	 */
+	public abstract boolean isSkipStrategy();
+
+	/**
+	 * Prunes matches/partial matches based on the chosen strategy.
+	 *
+	 * @param matchesToPrune current partial matches
+	 * @param matchedResult  already completed matches
+	 * @param sharedBuffer   corresponding shared buffer
+	 * @throws Exception thrown if could not access the state
+	 */
+	public void prune(
+			Collection<ComputationState> matchesToPrune,
+			Collection<Map<String, List<EventId>>> matchedResult,
+			SharedBuffer<?> sharedBuffer) throws Exception {
+
+		EventId pruningId = getPruningId(matchedResult);
+		if (pruningId != null) {
+			List<ComputationState> discardStates = new ArrayList<>();
+			for (ComputationState computationState : matchesToPrune) {
+				if (computationState.getStartEventID() != null &&
+					shouldPrune(computationState.getStartEventID(), pruningId)) {
+					sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
+					discardStates.add(computationState);
+				}
+			}
+			matchesToPrune.removeAll(discardStates);
+		}
+	}
+
+	/**
+	 * Tells if the partial/completed match starting at given id should be prunned by given pruningId.
+	 *
+	 * @param startEventID starting event id of a partial/completed match
+	 * @param pruningId   pruningId calculated by this strategy
+	 * @return true if the match should be pruned
+	 */
+	protected abstract boolean shouldPrune(EventId startEventID, EventId pruningId);
+
+	/**
+	 * Retrieves event id of the pruning element from the given match based on the strategy.
+	 *
+	 * @param match match corresponding to which should the pruning happen
+	 * @return pruning event id
+	 */
+	protected abstract EventId getPruningId(Collection<Map<String, List<EventId>>> match);
+
+	/**
+	 * Name of pattern that processing will be skipped to.
+	 */
+	public Optional<String> getPatternName() {
+		return Optional.empty();
+	}
+
+	static EventId max(EventId o1, EventId o2) {
+		if (o2 == null) {
+			return o1;
+		}
+
+		if (o1 == null) {
+			return o2;
+		}
+
+		if (o1.compareTo(o2) >= 0) {
+			return o1;
+		} else {
+			return o2;
+		}
+	}
+
+	/** Forbid further extending. */
+	AfterMatchSkipStrategy() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/NoSkipStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/NoSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/NoSkipStrategy.java
new file mode 100644
index 0000000..2f6769d
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/NoSkipStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Every possible match will be emitted.
+ */
+public class NoSkipStrategy extends AfterMatchSkipStrategy {
+
+	private static final long serialVersionUID = -5843740153729531775L;
+
+	static final NoSkipStrategy INSTANCE = new NoSkipStrategy();
+
+	private NoSkipStrategy() {
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return false;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		throw new IllegalStateException("This should never happen. Please file a bug.");
+	}
+
+	@Override
+	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
+		throw new IllegalStateException("This should never happen. Please file a bug.");
+	}
+
+	@Override
+	public String toString() {
+		return "NoSkipStrategy{}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
new file mode 100644
index 0000000..952d91a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Discards every partial match that contains event of the match.
+ */
+public class SkipPastLastStrategy extends AfterMatchSkipStrategy {
+
+	public static final SkipPastLastStrategy INSTANCE = new SkipPastLastStrategy();
+
+	private static final long serialVersionUID = -8450320065949093169L;
+
+	private SkipPastLastStrategy() {
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return true;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		return startEventID != null && startEventID.compareTo(pruningId) <= 0;
+	}
+
+	@Override
+	protected EventId getPruningId(final Collection<Map<String, List<EventId>>> match) {
+		EventId pruningId = null;
+		for (Map<String, List<EventId>> resultMap : match) {
+			for (List<EventId> eventList : resultMap.values()) {
+				pruningId = max(pruningId, eventList.get(eventList.size() - 1));
+			}
+		}
+
+		return pruningId;
+	}
+
+	@Override
+	public String toString() {
+		return "SkipPastLastStrategy{}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
new file mode 100644
index 0000000..7d7be4a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Discards every partial match that contains event of the match preceding the first of *PatternName*.
+ */
+public class SkipToFirstStrategy extends AfterMatchSkipStrategy {
+
+	private static final long serialVersionUID = 7127107527654629026L;
+	private final String patternName;
+
+	SkipToFirstStrategy(String patternName) {
+		this.patternName = checkNotNull(patternName);
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return true;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		return startEventID != null && startEventID.compareTo(pruningId) < 0;
+	}
+
+	@Override
+	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
+		EventId pruniningId = null;
+		for (Map<String, List<EventId>> resultMap : match) {
+			List<EventId> pruningPattern = resultMap.get(patternName);
+			if (pruningPattern != null && !pruningPattern.isEmpty()) {
+				pruniningId = max(pruniningId, pruningPattern.get(0));
+			}
+		}
+
+		return pruniningId;
+	}
+
+	@Override
+	public Optional<String> getPatternName() {
+		return Optional.of(patternName);
+	}
+
+	@Override
+	public String toString() {
+		return "SkipToFirstStrategy{" +
+			"patternName='" + patternName + '\'' +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
new file mode 100644
index 0000000..0f6c3ed
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Discards every partial match that contains event of the match preceding the last of *PatternName*.
+ */
+public class SkipToLastStrategy extends AfterMatchSkipStrategy {
+	private static final long serialVersionUID = 7585116990619594531L;
+	private final String patternName;
+
+	SkipToLastStrategy(String patternName) {
+		this.patternName = checkNotNull(patternName);
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return true;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		return startEventID != null && startEventID.compareTo(pruningId) < 0;
+	}
+
+	@Override
+	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
+		EventId pruningId = null;
+		for (Map<String, List<EventId>> resultMap : match) {
+			List<EventId> pruningPattern = resultMap.get(patternName);
+
+			if (pruningPattern != null && !pruningPattern.isEmpty()) {
+				pruningId = max(pruningId, pruningPattern.get(pruningPattern.size() - 1));
+			}
+		}
+
+		return pruningId;
+	}
+
+	@Override
+	public Optional<String> getPatternName() {
+		return Optional.of(patternName);
+	}
+
+	@Override
+	public String toString() {
+		return "SkipToLastStrategy{" +
+			"patternName='" + patternName + '\'' +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index dbb654c..8f49f68 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -19,11 +19,11 @@
 package org.apache.flink.cep.nfa.compiler;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.GroupPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
@@ -136,15 +136,15 @@ public class NFACompiler {
 		 * Check pattern after match skip strategy.
 		 */
 		private void checkPatternSkipStrategy() {
-			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
-				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+			if (afterMatchSkipStrategy.getPatternName().isPresent()) {
+				String patternName = afterMatchSkipStrategy.getPatternName().get();
 				Pattern<T, ?> pattern = currentPattern;
-				while (pattern.getPrevious() != null && !pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+				while (pattern.getPrevious() != null && !pattern.getName().equals(patternName)) {
 					pattern = pattern.getPrevious();
 				}
 
 				// pattern name match check.
-				if (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+				if (!pattern.getName().equals(patternName)) {
 					throw new MalformedPatternException("The pattern name specified in AfterMatchSkipStrategy " +
 						"can not be found in the given Pattern");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index 57d244a..c1a6ccb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -26,12 +26,13 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.Objects;
 
 /**
  * Composite key for events in {@link SharedBuffer}.
  */
-public class EventId {
+public class EventId implements Comparable<EventId> {
 	private final int id;
 	private final long timestamp;
 
@@ -48,6 +49,9 @@ public class EventId {
 		return timestamp;
 	}
 
+	public static final Comparator<EventId> COMPARATOR = Comparator.comparingLong(EventId::getTimestamp)
+		.thenComparingInt(EventId::getId);
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -74,6 +78,11 @@ public class EventId {
 			'}';
 	}
 
+	@Override
+	public int compareTo(EventId o) {
+		return COMPARATOR.compare(this, o);
+	}
+
 	/** {@link TypeSerializer} for {@link EventId}. */
 	public static class EventIdSerializer extends TypeSerializerSingleton<EventId> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
index 32be5da..c35e4d7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
@@ -35,6 +36,7 @@ import org.apache.commons.lang3.StringUtils;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -213,11 +215,11 @@ public class SharedBuffer<V> {
 	 * @return Collection of previous relations starting with the given value
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	public List<Map<String, List<V>>> extractPatterns(
+	public List<Map<String, List<EventId>>> extractPatterns(
 			final NodeId nodeId,
 			final DeweyNumber version) throws Exception {
 
-		List<Map<String, List<V>>> result = new ArrayList<>();
+		List<Map<String, List<EventId>>> result = new ArrayList<>();
 
 		// stack to remember the current extraction states
 		Stack<ExtractionState> extractionStates = new Stack<>();
@@ -238,15 +240,15 @@ public class SharedBuffer<V> {
 
 				// termination criterion
 				if (currentEntry == null) {
-					final Map<String, List<V>> completePath = new LinkedHashMap<>();
+					final Map<String, List<EventId>> completePath = new LinkedHashMap<>();
 
 					while (!currentPath.isEmpty()) {
 						final NodeId currentPathEntry = currentPath.pop().f0;
 
 						String page = currentPathEntry.getPageName();
-						List<V> values = completePath
+						List<EventId> values = completePath
 							.computeIfAbsent(page, k -> new ArrayList<>());
-						values.add(eventsBuffer.get(currentPathEntry.getEventId()).getElement());
+						values.add(currentPathEntry.getEventId());
 					}
 					result.add(completePath);
 				} else {
@@ -285,6 +287,32 @@ public class SharedBuffer<V> {
 		return result;
 	}
 
+	public Map<String, List<V>> materializeMatch(Map<String, List<EventId>> match) {
+		return materializeMatch(match, new HashMap<>());
+	}
+
+	public Map<String, List<V>> materializeMatch(Map<String, List<EventId>> match, Map<EventId, V> cache) {
+
+		Map<String, List<V>> materializedMatch = new LinkedHashMap<>(match.size());
+
+		for (Map.Entry<String, List<EventId>> pattern : match.entrySet()) {
+			List<V> events = new ArrayList<>(pattern.getValue().size());
+			for (EventId eventId : pattern.getValue()) {
+				V event = cache.computeIfAbsent(eventId, id -> {
+					try {
+						return eventsBuffer.get(id).getElement();
+					} catch (Exception ex) {
+						throw new WrappingRuntimeException(ex);
+					}
+				});
+				events.add(event);
+			}
+			materializedMatch.put(pattern.getKey(), events);
+		}
+
+		return materializedMatch;
+	}
+
 	/**
 	 * Increases the reference counter for the given entry so that it is not
 	 * accidentally removed.

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7e3e6f3..9c263b0 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -29,11 +29,11 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.NFA.MigratedNFA;
 import org.apache.flink.cep.nfa.NFAState;
 import org.apache.flink.cep.nfa.NFAStateSerializer;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.runtime.state.KeyedStateFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index bc5e2b1..bdb2897 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -28,7 +28,7 @@ import org.apache.flink.cep.PatternFlatTimeoutFunction;
 import org.apache.flink.cep.PatternSelectFunction;
 import org.apache.flink.cep.PatternStream;
 import org.apache.flink.cep.PatternTimeoutFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.TimeCharacteristic;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
index fb7143f..df54b53 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
@@ -21,7 +21,7 @@ package org.apache.flink.cep.operator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.util.OutputTag;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
index cc9caae..642c92a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.PatternFlatSelectFunction;
 import org.apache.flink.cep.PatternFlatTimeoutFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.util.OutputTag;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
index 1b2b28d..ad335e5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
@@ -21,7 +21,7 @@ package org.apache.flink.cep.operator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
index 344f374..73ac709 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.PatternSelectFunction;
 import org.apache.flink.cep.PatternTimeoutFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
index fce408c..749ab27 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.cep.pattern;
 
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index a276d9a..c821120 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -19,8 +19,8 @@
 package org.apache.flink.cep.pattern;
 
 import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
 import org.apache.flink.cep.pattern.Quantifier.Times;
 import org.apache.flink.cep.pattern.conditions.AndCondition;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 4f2383a..9b35788 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -21,8 +21,8 @@ package org.apache.flink.cep;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.core.fs.FileSystem;