You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:15 UTC

[04/50] [abbrv] flink git commit: [FLINK-3318] Backward compatibility of CEP NFA

[FLINK-3318] Backward compatibility of CEP NFA


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20fb090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20fb090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20fb090

Branch: refs/heads/table-retraction
Commit: d20fb090c31858bc0372a8c84228d796558d56b0
Parents: 9001c4e
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Tue Mar 21 10:53:07 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 162 ++++++-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  49 ++
 .../java/org/apache/flink/cep/nfa/State.java    |  19 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     | 116 +++++
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 194 +++++++-
 .../cep/operator/CEPMigration12to13Test.java    | 477 +++++++++++++++++++
 .../test/resources/cep-branching-snapshot-1.2   | Bin 0 -> 6736 bytes
 .../resources/cep-single-pattern-snapshot-1.2   | Bin 0 -> 3311 bytes
 .../test/resources/cep-starting-snapshot-1.2    | Bin 0 -> 6526 bytes
 9 files changed, 986 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 3d42248..ab03566 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,18 +27,22 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OptionalDataException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -85,9 +91,9 @@ public class NFA<T> implements Serializable {
 	private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;
 
 	/**
-	 * 	Buffer used to store the matched events.
+	 * 	Used only for backward compatibility. Buffer used to store the matched events.
 	 */
-	private final SharedBuffer<String, T> sharedBuffer;
+	private final SharedBuffer<State<T>, T> sharedBuffer = null;
 
 	/**
 	 * A set of all the valid NFA states, as returned by the
@@ -110,12 +116,22 @@ public class NFA<T> implements Serializable {
 	private final boolean handleTimeout;
 
 	/**
+	 * Used only for backward compatibility.
+	 */
+	private int startEventCounter;
+
+	/**
 	 * Current set of {@link ComputationState computation states} within the state machine.
 	 * These are the "active" intermediate states that are waiting for new matching
 	 * events to transition to new valid states.
 	 */
 	private transient Queue<ComputationState<T>> computationStates;
 
+	/**
+	 * 	Buffer used to store the matched events.
+	 */
+	private final SharedBuffer<String, T> stringSharedBuffer;
+
 	public NFA(
 			final TypeSerializer<T> eventSerializer,
 			final long windowTime,
@@ -124,7 +140,7 @@ public class NFA<T> implements Serializable {
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
 		this.windowTime = windowTime;
 		this.handleTimeout = handleTimeout;
-		sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+		stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
 		computationStates = new LinkedList<>();
 
 		states = new HashSet<>();
@@ -156,7 +172,7 @@ public class NFA<T> implements Serializable {
 	 * {@code false} otherwise.
 	 */
 	public boolean isEmpty() {
-		return sharedBuffer.isEmpty();
+		return stringSharedBuffer.isEmpty();
 	}
 
 	/**
@@ -194,9 +210,14 @@ public class NFA<T> implements Serializable {
 					}
 				}
 
-				// remove computation state which has exceeded the window length
-				sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
-				sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
+				stringSharedBuffer.release(
+						computationState.getPreviousState().getName(),
+						computationState.getEvent(),
+						computationState.getTimestamp());
+				stringSharedBuffer.remove(
+						computationState.getPreviousState().getName(),
+						computationState.getEvent(),
+						computationState.getTimestamp());
 
 				newComputationStates = Collections.emptyList();
 			} else if (event != null) {
@@ -212,8 +233,8 @@ public class NFA<T> implements Serializable {
 					result.addAll(matches);
 
 					// remove found patterns because they are no longer needed
-					sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
-					sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					stringSharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+					stringSharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
 				} else {
 					// add new computation state; it will be processed once the next event arrives
 					computationStates.add(newComputationState);
@@ -230,7 +251,7 @@ public class NFA<T> implements Serializable {
 
 				// remove all elements which are expired
 				// with respect to the window length
-				sharedBuffer.prune(pruningTimestamp);
+				stringSharedBuffer.prune(pruningTimestamp);
 			}
 		}
 
@@ -244,7 +265,7 @@ public class NFA<T> implements Serializable {
 			NFA<T> other = (NFA<T>) obj;
 
 			return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
-				sharedBuffer.equals(other.sharedBuffer) &&
+				stringSharedBuffer.equals(other.stringSharedBuffer) &&
 				states.equals(other.states) &&
 				windowTime == other.windowTime;
 		} else {
@@ -254,7 +275,7 @@ public class NFA<T> implements Serializable {
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime);
+		return Objects.hash(nonDuplicatingTypeSerializer, stringSharedBuffer, states, windowTime);
 	}
 
 	private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
@@ -376,7 +397,7 @@ public class NFA<T> implements Serializable {
 								computationState.getStartTimestamp()
 							)
 						);
-						sharedBuffer.lock(
+						stringSharedBuffer.lock(
 							computationState.getPreviousState().getName(),
 							computationState.getEvent(),
 							computationState.getTimestamp());
@@ -397,14 +418,14 @@ public class NFA<T> implements Serializable {
 					final long startTimestamp;
 					if (computationState.isStartState()) {
 						startTimestamp = timestamp;
-						sharedBuffer.put(
+						stringSharedBuffer.put(
 							consumingState.getName(),
 							event,
 							timestamp,
 							currentVersion);
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
-						sharedBuffer.put(
+						stringSharedBuffer.put(
 							consumingState.getName(),
 							event,
 							timestamp,
@@ -415,7 +436,7 @@ public class NFA<T> implements Serializable {
 					}
 
 					// a new computation state is referring to the shared entry
-					sharedBuffer.lock(consumingState.getName(), event, timestamp);
+					stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
 
 					resultingComputationStates.add(ComputationState.createState(
 						newState,
@@ -429,7 +450,7 @@ public class NFA<T> implements Serializable {
 					//check if newly created state is optional (have a PROCEED path to Final state)
 					final State<T> finalState = findFinalStateAfterProceed(newState, event);
 					if (finalState != null) {
-						sharedBuffer.lock(consumingState.getName(), event, timestamp);
+						stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
 						resultingComputationStates.add(ComputationState.createState(
 							finalState,
 							consumingState,
@@ -450,12 +471,12 @@ public class NFA<T> implements Serializable {
 
 		if (computationState.getEvent() != null) {
 			// release the shared entry referenced by the current computation state.
-			sharedBuffer.release(
+			stringSharedBuffer.release(
 				computationState.getPreviousState().getName(),
 				computationState.getEvent(),
 				computationState.getTimestamp());
 			// try to remove unnecessary shared buffer entries
-			sharedBuffer.remove(
+			stringSharedBuffer.remove(
 				computationState.getPreviousState().getName(),
 				computationState.getEvent(),
 				computationState.getTimestamp());
@@ -546,7 +567,7 @@ public class NFA<T> implements Serializable {
 	 * @return Collection of event sequences which end in the given computation state
 	 */
 	private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) {
-		Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns(
+		Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns(
 			computationState.getPreviousState().getName(),
 			computationState.getEvent(),
 			computationState.getTimestamp(),
@@ -592,6 +613,8 @@ public class NFA<T> implements Serializable {
 		nonDuplicatingTypeSerializer.clearReferences();
 	}
 
+	private final static String BEGINNING_STATE_NAME = "$beginningState$";
+
 	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		ois.defaultReadObject();
 
@@ -599,15 +622,103 @@ public class NFA<T> implements Serializable {
 
 		computationStates = new LinkedList<>();
 
+		final List<ComputationState<T>> readComputationStates = new ArrayList<>(numberComputationStates);
+
+		boolean afterMigration = false;
 		for (int i = 0; i < numberComputationStates; i++) {
 			ComputationState<T> computationState = readComputationState(ois);
+			if (computationState.getState().getName().equals(BEGINNING_STATE_NAME)) {
+				afterMigration = true;
+			}
 
-			computationStates.offer(computationState);
+			readComputationStates.add(computationState);
+		}
+
+		if (afterMigration && !readComputationStates.isEmpty()) {
+			try {
+				//Backwards compatibility
+				this.computationStates.addAll(migrateNFA(readComputationStates));
+				final Field newSharedBufferField = NFA.class.getDeclaredField("stringSharedBuffer");
+				final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer");
+				sharedBufferField.setAccessible(true);
+				newSharedBufferField.setAccessible(true);
+				newSharedBufferField.set(this, SharedBuffer.migrateSharedBuffer(this.sharedBuffer));
+				sharedBufferField.set(this, null);
+				sharedBufferField.setAccessible(false);
+				newSharedBufferField.setAccessible(false);
+			} catch (Exception e) {
+				throw new IllegalStateException("Could not migrate from earlier version", e);
+			}
+		} else {
+			this.computationStates.addAll(readComputationStates);
 		}
 
 		nonDuplicatingTypeSerializer.clearReferences();
 	}
 
+	/**
+	 * Needed for backward compatibility. First migrates the {@link State} graph see {@link NFACompiler#migrateGraph(State)}.
+	 * Than recreates the {@link ComputationState}s with the new {@link State} graph.
+	 * @param readStates computation states read from snapshot
+	 * @return collection of migrated computation states
+	 */
+	private Collection<ComputationState<T>> migrateNFA(Collection<ComputationState<T>> readStates) {
+		final ArrayList<ComputationState<T>> computationStates = new ArrayList<>();
+
+		final State<T> startState = Iterators.find(
+			readStates.iterator(),
+			new Predicate<ComputationState<T>>() {
+				@Override
+				public boolean apply(@Nullable ComputationState<T> input) {
+					return input != null && input.getState().getName().equals(BEGINNING_STATE_NAME);
+				}
+			}).getState();
+
+		final Map<String, State<T>> convertedStates = NFACompiler.migrateGraph(startState);
+
+		for (ComputationState<T> readState : readStates) {
+			if (!readState.isStartState()) {
+				final String previousName = readState.getState().getName();
+				final String currentName = Iterators.find(
+					readState.getState().getStateTransitions().iterator(),
+					new Predicate<StateTransition<T>>() {
+						@Override
+						public boolean apply(@Nullable StateTransition<T> input) {
+							return input != null && input.getAction() == StateTransitionAction.TAKE;
+						}
+					}).getTargetState().getName();
+
+
+				final State<T> previousState = convertedStates.get(previousName);
+
+				computationStates.add(ComputationState.createState(
+					convertedStates.get(currentName),
+					previousState,
+					readState.getEvent(),
+					readState.getTimestamp(),
+					readState.getVersion(),
+					readState.getStartTimestamp()
+				));
+			}
+		}
+
+		final String startName = Iterators.find(convertedStates.values().iterator(), new Predicate<State<T>>() {
+			@Override
+			public boolean apply(@Nullable State<T> input) {
+				return input != null && input.isStart();
+			}
+		}).getName();
+
+		computationStates.add(ComputationState.createStartState(
+			convertedStates.get(startName),
+			new DeweyNumber(this.startEventCounter)));
+
+		this.states.clear();
+		this.states.addAll(convertedStates.values());
+
+		return computationStates;
+	}
+
 	private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
 		oos.writeObject(computationState.getState());
 		oos.writeObject(computationState.getPreviousState());
@@ -629,7 +740,13 @@ public class NFA<T> implements Serializable {
 	@SuppressWarnings("unchecked")
 	private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		final State<T> state = (State<T>)ois.readObject();
-		final State<T> previousState = (State<T>)ois.readObject();
+		State<T> previousState;
+		try {
+			previousState = (State<T>)ois.readObject();
+		} catch (OptionalDataException e) {
+			previousState = null;
+		}
+
 		final long timestamp = ois.readLong();
 		final DeweyNumber version = (DeweyNumber)ois.readObject();
 		final long startTimestamp = ois.readLong();
@@ -647,6 +764,7 @@ public class NFA<T> implements Serializable {
 		return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
 	}
 
+
 	/**
 	 * Generates a state name from a given name template and an index.
 	 * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index e6a8c75..d5b7876 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -463,6 +464,54 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		}
 	}
 
+	private SharedBuffer(
+		TypeSerializer<V> valueSerializer,
+		Map<K, SharedBufferPage<K, V>> pages) {
+		this.valueSerializer = valueSerializer;
+		this.pages = pages;
+	}
+
+	/**
+	 * For backward compatibility only. Previously the key in {@link SharedBuffer} was {@link State}.
+	 * Now it is {@link String}.
+	 */
+	@Internal
+	static <T> SharedBuffer<String, T> migrateSharedBuffer(SharedBuffer<State<T>, T> buffer) {
+
+		final Map<String, SharedBufferPage<String, T>> pageMap = new HashMap<>();
+		final Map<SharedBufferEntry<State<T>, T>, SharedBufferEntry<String, T>> entries = new HashMap<>();
+
+		for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
+			final SharedBufferPage<String, T> newPage = new SharedBufferPage<>(page.getKey().getName());
+			pageMap.put(newPage.getKey(), newPage);
+
+			for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
+				final SharedBufferEntry<String, T> newSharedBufferEntry = new SharedBufferEntry<>(
+					pageEntry.getKey(),
+					newPage);
+				newSharedBufferEntry.referenceCounter = pageEntry.getValue().referenceCounter;
+				entries.put(pageEntry.getValue(), newSharedBufferEntry);
+				newPage.entries.put(pageEntry.getKey(), newSharedBufferEntry);
+			}
+		}
+
+		for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
+			for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
+				final SharedBufferEntry<String, T> newEntry = entries.get(pageEntry.getValue());
+				for (SharedBufferEdge<State<T>, T> edge : pageEntry.getValue().edges) {
+					final SharedBufferEntry<String, T> targetNewEntry = entries.get(edge.getTarget());
+
+					final SharedBufferEdge<String, T> newEdge = new SharedBufferEdge<>(
+						targetNewEntry,
+						edge.getVersion());
+					newEntry.edges.add(newEdge);
+				}
+			}
+		}
+
+		return new SharedBuffer<>(buffer.valueSerializer, pageMap);
+	}
+
 	private SharedBufferEntry<K, V> get(
 		final K key,
 		final V value,

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 7bcb6ea..27e0dcd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -20,9 +20,12 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -62,7 +65,6 @@ public class State<T> implements Serializable {
 		return stateTransitions;
 	}
 
-
 	private void addStateTransition(
 		final StateTransitionAction action,
 		final State<T> targetState,
@@ -132,4 +134,19 @@ public class State<T> implements Serializable {
 		Final, // the state is a final state for the NFA
 		Normal // the state is neither a start nor a final state
 	}
+
+	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+		ois.defaultReadObject();
+
+		//Backward compatibility. Previous version of StateTransition did not have source state
+		if (!stateTransitions.isEmpty() && stateTransitions.iterator().next().getSourceState() == null) {
+			final List<StateTransition<T>> tmp = new ArrayList<>();
+			tmp.addAll(this.stateTransitions);
+
+			this.stateTransitions.clear();
+			for (StateTransition<T> transition : tmp) {
+				addStateTransition(transition.getAction(), transition.getTargetState(), transition.getCondition());
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b476c49..8bd8612 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,10 +18,15 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
 import org.apache.flink.cep.pattern.FilterFunctions;
 import org.apache.flink.cep.pattern.FollowedByPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
@@ -31,12 +36,15 @@ import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -363,6 +371,114 @@ public class NFACompiler {
 	}
 
 	/**
+	 * Used for migrating CEP graphs prior to 1.3. It removes the dummy start, adds the dummy end, and translates all
+	 * states to consuming ones by moving all TAKEs and IGNOREs to the next state. This method assumes each state
+	 * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed!
+	 *
+	 * @param oldStartState dummy start state of old graph
+	 * @param <T> type of events
+	 * @return map of new states, where key is the name of a state and value is the state itself
+	 */
+	@Internal
+	public static <T> Map<String, State<T>> migrateGraph(State<T> oldStartState) {
+		State<T> oldFirst = oldStartState;
+		State<T> oldSecond = oldStartState.getStateTransitions().iterator().next().getTargetState();
+
+		StateTransition<T> oldFirstToSecondTake = Iterators.find(
+			oldFirst.getStateTransitions().iterator(),
+			new Predicate<StateTransition<T>>() {
+				@Override
+				public boolean apply(@Nullable StateTransition<T> input) {
+					return input != null && input.getAction() == StateTransitionAction.TAKE;
+				}
+
+			});
+
+		StateTransition<T> oldFirstIgnore = Iterators.find(
+			oldFirst.getStateTransitions().iterator(),
+			new Predicate<StateTransition<T>>() {
+				@Override
+				public boolean apply(@Nullable StateTransition<T> input) {
+					return input != null && input.getAction() == StateTransitionAction.IGNORE;
+				}
+
+			}, null);
+
+		StateTransition<T> oldSecondToThirdTake = Iterators.find(
+			oldSecond.getStateTransitions().iterator(),
+			new Predicate<StateTransition<T>>() {
+				@Override
+				public boolean apply(@Nullable StateTransition<T> input) {
+					return input != null && input.getAction() == StateTransitionAction.TAKE;
+				}
+
+			}, null);
+
+		final Map<String, State<T>> convertedStates = new HashMap<>();
+		State<T> newSecond;
+		State<T> newFirst = new State<>(oldSecond.getName(), State.StateType.Start);
+		convertedStates.put(newFirst.getName(), newFirst);
+		while (oldSecondToThirdTake != null) {
+
+			newSecond = new State<T>(oldSecondToThirdTake.getTargetState().getName(), State.StateType.Normal);
+			convertedStates.put(newSecond.getName(), newSecond);
+			newFirst.addTake(newSecond, oldFirstToSecondTake.getCondition());
+
+			if (oldFirstIgnore != null) {
+				newFirst.addIgnore(oldFirstIgnore.getCondition());
+			}
+
+			oldFirst = oldSecond;
+
+			oldFirstToSecondTake = Iterators.find(
+				oldFirst.getStateTransitions().iterator(),
+				new Predicate<StateTransition<T>>() {
+					@Override
+					public boolean apply(@Nullable StateTransition<T> input) {
+						return input != null && input.getAction() == StateTransitionAction.TAKE;
+					}
+
+				});
+
+			oldFirstIgnore = Iterators.find(
+				oldFirst.getStateTransitions().iterator(),
+				new Predicate<StateTransition<T>>() {
+					@Override
+					public boolean apply(@Nullable StateTransition<T> input) {
+						return input != null && input.getAction() == StateTransitionAction.IGNORE;
+					}
+
+				}, null);
+
+			oldSecond = oldSecondToThirdTake.getTargetState();
+
+			oldSecondToThirdTake = Iterators.find(
+				oldSecond.getStateTransitions().iterator(),
+				new Predicate<StateTransition<T>>() {
+					@Override
+					public boolean apply(@Nullable StateTransition<T> input) {
+						return input != null && input.getAction() == StateTransitionAction.TAKE;
+					}
+
+				}, null);
+
+			newFirst = newSecond;
+		}
+
+		final State<T> endingState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
+
+		newFirst.addTake(endingState, oldFirstToSecondTake.getCondition());
+
+		if (oldFirstIgnore != null) {
+			newFirst.addIgnore(oldFirstIgnore.getCondition());
+		}
+
+		convertedStates.put(endingState.getName(), endingState);
+
+		return convertedStates;
+	}
+
+	/**
 	 * Factory interface for {@link NFA}.
 	 *
 	 * @param <T> Type of the input events which are processed by the NFA

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 825ba957..5b05f19 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class NFAITCase extends TestLogger {
 
@@ -421,7 +422,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testComplexBranchingAfterKleeneStar() {
+	public void testComplexBranchingAfterZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -519,7 +520,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testKleeneStar() {
+	public void testZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -581,7 +582,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testEagerKleeneStar() {
+	public void testEagerZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -646,7 +647,7 @@ public class NFAITCase extends TestLogger {
 
 
 	@Test
-	public void testBeginWithKleeneStar() {
+	public void testBeginWithZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event middleEvent1 = new Event(40, "a", 2.0);
@@ -704,7 +705,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testKleeneStarAfterKleeneStar() {
+	public void testZeroOrMoreAfterZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -779,7 +780,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testKleeneStarAfterBranching() {
+	public void testZeroOrMoreAfterBranching() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -865,7 +866,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testStrictContinuityNoResultsAfterKleeneStar() {
+	public void testStrictContinuityNoResultsAfterZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event start = new Event(40, "d", 2.0);
@@ -923,7 +924,7 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
-	public void testStrictContinuityResultsAfterKleeneStar() {
+	public void testStrictContinuityResultsAfterZeroOrMore() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event start = new Event(40, "d", 2.0);
@@ -1663,4 +1664,181 @@ public class NFAITCase extends TestLogger {
 		), resultingPatterns);
 	}
 
+	/**
+	 * Clearing SharedBuffer
+	 */
+
+	@Test
+	public void testTimesClearingBuffer() {
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).next("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).within(Time.milliseconds(8));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		nfa.process(startEvent, 1);
+		nfa.process(middleEvent1, 2);
+		nfa.process(middleEvent2, 3);
+		nfa.process(middleEvent3, 4);
+		nfa.process(end1, 6);
+
+		//pruning element
+		nfa.process(null, 10);
+
+		assertEquals(true, nfa.isEmpty());
+	}
+
+	@Test
+	public void testOptionalClearingBuffer() {
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).optional().followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).within(Time.milliseconds(8));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		nfa.process(startEvent, 1);
+		nfa.process(middleEvent, 5);
+		nfa.process(end1, 6);
+
+		//pruning element
+		nfa.process(null, 10);
+
+		assertEquals(true, nfa.isEmpty());
+	}
+
+	@Test
+	public void testAtLeastOneClearingBuffer() {
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).within(Time.milliseconds(8));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		nfa.process(startEvent, 1);
+		nfa.process(middleEvent1, 3);
+		nfa.process(middleEvent2, 4);
+		nfa.process(end1, 6);
+
+		//pruning element
+		nfa.process(null, 10);
+
+		assertEquals(true, nfa.isEmpty());
+	}
+
+
+	@Test
+	public void testZeroOrMoreClearingBuffer() {
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).within(Time.milliseconds(8));
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		nfa.process(startEvent, 1);
+		nfa.process(middleEvent1, 3);
+		nfa.process(middleEvent2, 4);
+		nfa.process(end1, 6);
+
+		//pruning element
+		nfa.process(null, 10);
+
+		assertEquals(true, nfa.isEmpty());
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
new file mode 100644
index 0000000..65fa733
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration12to13Test {
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = CEPMigration12to13Test.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		if (resource == null) {
+			throw new NullPointerException("Missing snapshot resource.");
+		}
+		return resource.getFile();
+	}
+
+	@Test
+	public void testMigrationAfterBranchingPattern() throws Exception {
+
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		final Event startEvent = new Event(42, "start", 1.0);
+		final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+		final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+		final Event endEvent = new Event(42, "end", 1.0);
+
+		// uncomment these lines for regenerating the snapshot on Flink 1.2
+//		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+//			new KeyedOneInputStreamOperatorTestHarness<>(
+//				new KeyedCEPPatternOperator<>(
+//					Event.createTypeSerializer(),
+//					false,
+//					keySelector,
+//					IntSerializer.INSTANCE,
+//					new NFAFactory(),
+//					true),
+//				keySelector,
+//				BasicTypeInfo.INT_TYPE_INFO);
+//
+//		harness.setup();
+//		harness.open();
+//		harness.processElement(new StreamRecord<Event>(startEvent, 1));
+//		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+//		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+//		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+//		harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+//		harness.processWatermark(new Watermark(5));
+//		// simulate snapshot/restore with empty element queue but NFA state
+//		OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//		FileOutputStream out = new FileOutputStream(
+//				"src/test/resources/cep-branching-snapshot-1.2");
+//		ObjectOutputStream oos = new ObjectOutputStream(out);
+//		oos.writeObject(snapshot.getOperatorChainIndex());
+//		oos.writeObject(snapshot.getLegacyOperatorState());
+//		oos.writeObject(snapshot.getManagedKeyedState());
+//		oos.writeObject(snapshot.getRawKeyedState());
+//		oos.writeObject(snapshot.getManagedOperatorState());
+//		oos.writeObject(snapshot.getRawOperatorState());
+//		out.close();
+//		harness.close();
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					keySelector,
+					IntSerializer.INSTANCE,
+					new NFAFactory(),
+					true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+			"cep-branching-snapshot-1.2")));
+		final OperatorStateHandles snapshot = new OperatorStateHandles(
+			(int) ois.readObject(),
+			(StreamStateHandle) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject()
+		);
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+		harness.processElement(new StreamRecord<>(endEvent, 5));
+
+		harness.processWatermark(new Watermark(20));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and 2 results
+		assertEquals(3, result.size());
+
+		Object resultObject1 = result.poll();
+		assertTrue(resultObject1 instanceof StreamRecord);
+		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+		assertTrue(resultRecord1.getValue() instanceof Map);
+
+		Object resultObject2 = result.poll();
+		assertTrue(resultObject2 instanceof StreamRecord);
+		StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+		assertTrue(resultRecord2.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+		assertEquals(startEvent, patternMap1.get("start"));
+		assertEquals(middleEvent1, patternMap1.get("middle"));
+		assertEquals(endEvent, patternMap1.get("end"));
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+		assertEquals(startEvent, patternMap2.get("start"));
+		assertEquals(middleEvent2, patternMap2.get("middle"));
+		assertEquals(endEvent, patternMap2.get("end"));
+
+		harness.close();
+	}
+
+	@Test
+	public void testStartingNewPatternAfterMigration() throws Exception {
+
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		final Event startEvent1 = new Event(42, "start", 1.0);
+		final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+		final Event startEvent2 = new Event(42, "start", 5.0);
+		final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+		final Event endEvent = new Event(42, "end", 1.0);
+
+		// uncomment these lines for regenerating the snapshot on Flink 1.2
+//		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+//			new KeyedOneInputStreamOperatorTestHarness<>(
+//				new KeyedCEPPatternOperator<>(
+//					Event.createTypeSerializer(),
+//					false,
+//					keySelector,
+//					IntSerializer.INSTANCE,
+//					new NFAFactory(),
+//					true),
+//				keySelector,
+//				BasicTypeInfo.INT_TYPE_INFO);
+//
+//		harness.setup();
+//		harness.open();
+//		harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+//		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+//		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+//		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+//		harness.processWatermark(new Watermark(5));
+//		// simulate snapshot/restore with empty element queue but NFA state
+//		OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//		FileOutputStream out = new FileOutputStream(
+//				"src/test/resources/cep-starting-snapshot-1.2");
+//		ObjectOutputStream oos = new ObjectOutputStream(out);
+//		oos.writeObject(snapshot.getOperatorChainIndex());
+//		oos.writeObject(snapshot.getLegacyOperatorState());
+//		oos.writeObject(snapshot.getManagedKeyedState());
+//		oos.writeObject(snapshot.getRawKeyedState());
+//		oos.writeObject(snapshot.getManagedOperatorState());
+//		oos.writeObject(snapshot.getRawOperatorState());
+//		out.close();
+//		harness.close();
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					keySelector,
+					IntSerializer.INSTANCE,
+					new NFAFactory(),
+					true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+			"cep-starting-snapshot-1.2")));
+		final OperatorStateHandles snapshot = new OperatorStateHandles(
+			(int) ois.readObject(),
+			(StreamStateHandle) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject()
+		);
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(startEvent2, 5));
+		harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
+		harness.processElement(new StreamRecord<>(endEvent, 7));
+
+		harness.processWatermark(new Watermark(20));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and 3 results
+		assertEquals(4, result.size());
+
+		Object resultObject1 = result.poll();
+		assertTrue(resultObject1 instanceof StreamRecord);
+		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+		assertTrue(resultRecord1.getValue() instanceof Map);
+
+		Object resultObject2 = result.poll();
+		assertTrue(resultObject2 instanceof StreamRecord);
+		StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+		assertTrue(resultRecord2.getValue() instanceof Map);
+
+		Object resultObject3 = result.poll();
+		assertTrue(resultObject3 instanceof StreamRecord);
+		StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+		assertTrue(resultRecord3.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+		assertEquals(startEvent1, patternMap1.get("start"));
+		assertEquals(middleEvent1, patternMap1.get("middle"));
+		assertEquals(endEvent, patternMap1.get("end"));
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+		assertEquals(startEvent1, patternMap2.get("start"));
+		assertEquals(middleEvent2, patternMap2.get("middle"));
+		assertEquals(endEvent, patternMap2.get("end"));
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap3 = (Map<String, Event>) resultRecord3.getValue();
+
+		assertEquals(startEvent2, patternMap3.get("start"));
+		assertEquals(middleEvent2, patternMap3.get("middle"));
+		assertEquals(endEvent, patternMap3.get("end"));
+
+		harness.close();
+	}
+
+	@Test
+	public void testSinglePatternAfterMigration() throws Exception {
+
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		final Event startEvent1 = new Event(42, "start", 1.0);
+
+		// uncomment these lines for regenerating the snapshot on Flink 1.2
+//		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+//			new KeyedOneInputStreamOperatorTestHarness<>(
+//				new KeyedCEPPatternOperator<>(
+//					Event.createTypeSerializer(),
+//					false,
+//					keySelector,
+//					IntSerializer.INSTANCE,
+//					new SinglePatternNFAFactory(),
+//					true),
+//				keySelector,
+//				BasicTypeInfo.INT_TYPE_INFO);
+//
+//		harness.setup();
+//		harness.open();
+//		harness.processWatermark(new Watermark(5));
+//		// simulate snapshot/restore with empty element queue but NFA state
+//		OperatorStateHandles snapshot = harness.snapshot(1, 1);
+//		FileOutputStream out = new FileOutputStream(
+//				"src/test/resources/cep-single-pattern-snapshot-1.2");
+//		ObjectOutputStream oos = new ObjectOutputStream(out);
+//		oos.writeObject(snapshot.getOperatorChainIndex());
+//		oos.writeObject(snapshot.getLegacyOperatorState());
+//		oos.writeObject(snapshot.getManagedKeyedState());
+//		oos.writeObject(snapshot.getRawKeyedState());
+//		oos.writeObject(snapshot.getManagedOperatorState());
+//		oos.writeObject(snapshot.getRawOperatorState());
+//		out.close();
+//		harness.close();
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+			new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					keySelector,
+					IntSerializer.INSTANCE,
+					new SinglePatternNFAFactory(),
+					true),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+			"cep-single-pattern-snapshot-1.2")));
+		final OperatorStateHandles snapshot = new OperatorStateHandles(
+			(int) ois.readObject(),
+			(StreamStateHandle) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<KeyGroupsStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject(),
+			(Collection<OperatorStateHandle>) ois.readObject()
+		);
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(startEvent1, 5));
+
+		harness.processWatermark(new Watermark(20));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject = result.poll();
+		assertTrue(resultObject instanceof StreamRecord);
+		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+		assertTrue(resultRecord.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+		assertEquals(startEvent1, patternMap.get("start"));
+
+		harness.close();
+	}
+
+	private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		private final boolean handleTimeout;
+
+		private SinglePatternNFAFactory() {
+			this(false);
+		}
+
+		private SinglePatternNFAFactory(boolean handleTimeout) {
+			this.handleTimeout = handleTimeout;
+		}
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+				.within(Time.milliseconds(10L));
+
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+		}
+	}
+
+	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		private final boolean handleTimeout;
+
+		private NFAFactory() {
+			this(false);
+		}
+
+		private NFAFactory(boolean handleTimeout) {
+			this.handleTimeout = handleTimeout;
+		}
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+				.followedBy("middle")
+				.subtype(SubEvent.class)
+				.where(new MiddleFilter())
+				.followedBy("end")
+				.where(new EndFilter())
+				// add a window timeout to test whether timestamps of elements in the
+				// priority queue in CEP operator are correctly checkpointed/restored
+				.within(Time.milliseconds(10L));
+
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+		}
+	}
+
+	private static class StartFilter implements FilterFunction<Event> {
+		private static final long serialVersionUID = 5726188262756267490L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("start");
+		}
+	}
+
+	private static class MiddleFilter implements FilterFunction<SubEvent> {
+		private static final long serialVersionUID = 6215754202506583964L;
+
+		@Override
+		public boolean filter(SubEvent value) throws Exception {
+			return value.getVolume() > 5.0;
+		}
+	}
+
+	private static class EndFilter implements FilterFunction<Event> {
+		private static final long serialVersionUID = 7056763917392056548L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("end");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
new file mode 100644
index 0000000..47f710e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
new file mode 100644
index 0000000..255f46a
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
new file mode 100644
index 0000000..c41f6c2
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 differ