You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/03/17 15:35:26 UTC

[1/2] flink git commit: [FLINK-5932] initializeState() before legacy state restoring in operator

Repository: flink
Updated Branches:
  refs/heads/master 8765f3241 -> 521a53d9a


[FLINK-5932] initializeState() before legacy state restoring in operator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d998ff91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d998ff91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d998ff91

Branch: refs/heads/master
Commit: d998ff9175ec84aa5685fbd6ebe79341a4da8e8f
Parents: 8765f32
Author: kl0u <kk...@gmail.com>
Authored: Tue Feb 28 22:31:01 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Mar 17 16:32:40 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBase.java          |  2 +-
 .../streaming/api/operators/AbstractStreamOperator.java   | 10 ++++++++--
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d998ff91/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 6858509..d409027 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -574,7 +574,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
 			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
 
-		restoredState = restoredOffsets;
+		restoredState = restoredOffsets.isEmpty() ? null : restoredOffsets;
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",

http://git-wip-us.apache.org/repos/asf/flink/blob/d998ff91/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index ef23be9..b6f86a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -215,8 +215,6 @@ public abstract class AbstractStreamOperator<OUT>
 
 		if (restoring) {
 
-			restoreStreamCheckpointed(stateHandles);
-
 			//pass directly
 			operatorStateHandlesBackend = stateHandles.getManagedOperatorState();
 			operatorStateHandlesRaw = stateHandles.getRawOperatorState();
@@ -240,6 +238,14 @@ public abstract class AbstractStreamOperator<OUT>
 				getContainingTask().getCancelables()); // access to register streams for canceling
 
 		initializeState(initializationContext);
+
+		if (restoring) {
+
+			// finally restore the legacy state in case we are
+			// migrating from a previous Flink version.
+
+			restoreStreamCheckpointed(stateHandles);
+		}
 	}
 
 	@Deprecated


[2/2] flink git commit: [FLINK-5846] [cep] Make the CEP operators backwards compatible with Flink 1.1

Posted by kk...@apache.org.
[FLINK-5846] [cep] Make the CEP operators backwards compatible with Flink 1.1


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/521a53d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/521a53d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/521a53d9

Branch: refs/heads/master
Commit: 521a53d9ad68a3f16a32e08843a6fca2bd4e439d
Parents: d998ff9
Author: kl0u <kk...@gmail.com>
Authored: Mon Feb 20 14:51:51 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Mar 17 16:32:41 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  47 ++--
 .../AbstractKeyedCEPPatternOperator.java        |  85 +++++-
 .../flink/cep/operator/CEPOperatorUtils.java    |  12 +-
 .../cep/operator/KeyedCEPPatternOperator.java   |   5 +-
 .../TimeoutKeyedCEPPatternOperator.java         |   5 +-
 .../cep/operator/CEPMigration11to13Test.java    | 268 +++++++++++++++++++
 .../flink/cep/operator/CEPOperatorTest.java     |   6 +-
 .../flink/cep/operator/CEPRescalingTest.java    |   3 +-
 .../src/test/resources/cep-keyed-snapshot-1.1   | Bin 0 -> 5612 bytes
 .../test/resources/cep-non-keyed-snapshot-1.1   | Bin 0 -> 3274 bytes
 .../MultiplexingStreamRecordSerializer.java     | 229 ++++++++++++++++
 .../streamrecord/StreamRecordSerializer.java    | 150 +++++++++++
 pom.xml                                         |   2 +
 13 files changed, 769 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 8d87fd8..12afe4e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -119,6 +119,8 @@ public class NFA<T> implements Serializable {
 	 */
 	private transient Queue<ComputationState<T>> computationStates;
 
+	private StateTransitionComparator<T>  stateTransitionComparator;
+
 	public NFA(
 			final TypeSerializer<T> eventSerializer,
 			final long windowTime,
@@ -131,6 +133,7 @@ public class NFA<T> implements Serializable {
 		this.computationStates = new LinkedList<>();
 		this.states = new HashSet<>();
 		this.startEventCounter = 1;
+		this.stateTransitionComparator =  new StateTransitionComparator<>();
 	}
 
 	public Set<State<T>> getStates() {
@@ -262,22 +265,6 @@ public class NFA<T> implements Serializable {
 	}
 
 	/**
-	 * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state.
-	 */
-	private interface StateTransitionComparator<T> extends Comparator<StateTransition<T>>, Serializable {}
-	private final Comparator<StateTransition<T>> stateTransitionComparator = new StateTransitionComparator<T>() {
-		private static final long serialVersionUID = -2775474935413622278L;
-
-		@Override
-		public int compare(final StateTransition<T> o1, final StateTransition<T> o2) {
-			if (o1.getAction() == o2.getAction()) {
-				return 0;
-			}
-			return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1;
-		}
-	};
-
-	/**
 	 * Computes the next computation states based on the given computation state, the current event,
 	 * its timestamp and the internal state machine.
 	 *
@@ -301,6 +288,13 @@ public class NFA<T> implements Serializable {
 			State<T> currentState = states.pop();
 			final List<StateTransition<T>> stateTransitions = new ArrayList<>(currentState.getStateTransitions());
 
+			// this is for when we restore from legacy. In that case, the comparator is null
+			// as it did not exist in the previous Flink versions, so we have to initialize it here.
+
+			if (stateTransitionComparator == null) {
+				stateTransitionComparator = new StateTransitionComparator();
+			}
+
 			// impose the IGNORE will be processed last
 			Collections.sort(stateTransitions, stateTransitionComparator);
 
@@ -601,10 +595,7 @@ public class NFA<T> implements Serializable {
 			ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source));
 
 			try {
-				@SuppressWarnings("unchecked")
-				NFA<T> nfa = null;
-				nfa = (NFA<T>) ois.readObject();
-				return nfa;
+				return (NFA<T>) ois.readObject();
 			} catch (ClassNotFoundException e) {
 				throw new RuntimeException("Could not deserialize NFA.", e);
 			}
@@ -637,4 +628,20 @@ public class NFA<T> implements Serializable {
 			return getClass().hashCode();
 		}
 	}
+
+	/**
+	 * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state.
+	 */
+	private static final class StateTransitionComparator<T> implements Serializable, Comparator<StateTransition<T>> {
+
+		private static final long serialVersionUID = -2775474935413622278L;
+
+		@Override
+		public int compare(final StateTransition<T> o1, final StateTransition<T> o2) {
+			if (o1.getAction() == o2.getAction()) {
+				return 0;
+			}
+			return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index de7daea..b6d57cd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -25,19 +25,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.api.operators.InternalWatermarkCallbackService;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OnWatermarkCallback;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Objects;
 import java.util.PriorityQueue;
@@ -55,7 +61,7 @@ import java.util.PriorityQueue;
  */
 public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<IN, OUT> {
+	implements OneInputStreamOperator<IN, OUT>, CheckpointedRestoringOperator {
 
 	private static final long serialVersionUID = -4166778210774160757L;
 
@@ -82,18 +88,26 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
 	private final NFACompiler.NFAFactory<IN> nfaFactory;
 
+	/**
+	 * A flag used in the case of migration that indicates if
+	 * we are restoring from an old keyed or non-keyed operator.
+	 */
+	private final boolean migratingFromOldKeyedOperator;
+
 	public AbstractKeyedCEPPatternOperator(
 			final TypeSerializer<IN> inputSerializer,
 			final boolean isProcessingTime,
 			final KeySelector<IN, KEY> keySelector,
 			final TypeSerializer<KEY> keySerializer,
-			final NFACompiler.NFAFactory<IN> nfaFactory) {
+			final NFACompiler.NFAFactory<IN> nfaFactory,
+			final boolean migratingFromOldKeyedOperator) {
 
 		this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
 		this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
 		this.keySelector = Preconditions.checkNotNull(keySelector);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+		this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
 	}
 
 	public TypeSerializer<IN> getInputSerializer() {
@@ -104,15 +118,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
 
-		// we have to call initializeState here and in the migration restore()
-		// method because the restore() (from legacy) is called before the
-		// initializeState().
-
-		initializeState();
-	}
-
-	private void initializeState() {
-
 		if (nfaOperatorState == null) {
 			nfaOperatorState = getRuntimeContext().getState(
 				new ValueStateDescriptor<>(
@@ -252,6 +257,57 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	 */
 	protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
 
+	//////////////////////			Backwards Compatibility			//////////////////////
+
+	@Override
+	public void restoreState(FSDataInputStream in) throws Exception {
+		// this is the flag indicating if we have udf
+		// state to restore (not needed here)
+		in.read();
+
+		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(in);
+		InternalWatermarkCallbackService<KEY> watermarkCallbackService = getInternalWatermarkCallbackService();
+
+		if (migratingFromOldKeyedOperator) {
+			int numberEntries = inputView.readInt();
+			for (int i = 0; i <numberEntries; i++) {
+				watermarkCallbackService.registerKeyForWatermarkCallback(keySerializer.deserialize(inputView));
+			}
+		} else {
+
+			final ObjectInputStream ois = new ObjectInputStream(in);
+
+			// retrieve the NFA
+			@SuppressWarnings("unchecked")
+			NFA<IN> nfa = (NFA<IN>) ois.readObject();
+
+			// retrieve the elements that were pending in the priority queue
+			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueFactory.createPriorityQueue();
+			int entries = ois.readInt();
+			for (int i = 0; i < entries; i++) {
+				StreamElement streamElement = recordSerializer.deserialize(inputView);
+				priorityQueue.offer(streamElement.<IN>asRecord());
+			}
+
+			// finally register the retrieved state with the new keyed state.
+			setCurrentKey((byte) 0);
+			nfaOperatorState.update(nfa);
+			priorityQueueOperatorState.update(priorityQueue);
+
+			if (!isProcessingTime) {
+				// this is relevant only for event/ingestion time
+
+				// need to work around type restrictions
+				InternalWatermarkCallbackService rawWatermarkCallbackService =
+					(InternalWatermarkCallbackService) watermarkCallbackService;
+
+				rawWatermarkCallbackService.registerKeyForWatermarkCallback((byte) 0);
+			}
+			ois.close();
+		}
+	}
+
 	//////////////////////			Utility Classes			//////////////////////
 
 	/**
@@ -348,6 +404,11 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		}
 
 		@Override
+		public boolean isCompatibleWith(TypeSerializer<?> other) {
+			return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer;
+		}
+
+		@Override
 		public boolean equals(Object obj) {
 			if (obj instanceof PriorityQueueSerializer) {
 				@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 56ecb17..a5eef45 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -72,7 +72,8 @@ public class CEPOperatorUtils {
 					isProcessingTime,
 					keySelector,
 					keySerializer,
-					nfaFactory));
+					nfaFactory,
+					true));
 		} else {
 
 			KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
@@ -86,7 +87,8 @@ public class CEPOperatorUtils {
 					isProcessingTime,
 					keySelector,
 					keySerializer,
-					nfaFactory
+					nfaFactory,
+					false
 				)).forceNonParallel();
 		}
 
@@ -133,7 +135,8 @@ public class CEPOperatorUtils {
 					isProcessingTime,
 					keySelector,
 					keySerializer,
-					nfaFactory));
+					nfaFactory,
+					true));
 		} else {
 
 			KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
@@ -147,7 +150,8 @@ public class CEPOperatorUtils {
 					isProcessingTime,
 					keySelector,
 					keySerializer,
-					nfaFactory
+					nfaFactory,
+					false
 				)).forceNonParallel();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 5b6ffe2..21cee23 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -45,9 +45,10 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
 			boolean isProcessingTime,
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
-			NFACompiler.NFAFactory<IN> nfaFactory) {
+			NFACompiler.NFAFactory<IN> nfaFactory,
+			boolean migratingFromOldKeyedOperator) {
 
-		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
+		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 6889bb9..c6fba55 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -45,9 +45,10 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
 			boolean isProcessingTime,
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
-			NFACompiler.NFAFactory<IN> nfaFactory) {
+			NFACompiler.NFAFactory<IN> nfaFactory,
+			boolean migratingFromOldKeyedOperator) {
 
-		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
+		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
new file mode 100644
index 0000000..5a3e623
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration11to13Test {
+
+	private static String getResourceFilename(String filename) {
+		ClassLoader cl = CEPMigration11to13Test.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		if (resource == null) {
+			throw new NullPointerException("Missing snapshot resource.");
+		}
+		return resource.getFile();
+	}
+
+	@Test
+	public void testKeyedCEPOperatorMigratation() throws Exception {
+
+		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+
+		final Event startEvent = new Event(42, "start", 1.0);
+		final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+		final Event endEvent = new Event(42, "end", 1.0);
+
+		// uncomment these lines for regenerating the snapshot on Flink 1.1
+		/*
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						keySelector,
+						IntSerializer.INSTANCE,
+						new NFAFactory()));
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+		harness.open();
+		harness.processElement(new StreamRecord<Event>(startEvent, 1));
+		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+		harness.processWatermark(new Watermark(2));
+		// simulate snapshot/restore with empty element queue but NFA state
+		StreamTaskState snapshot = harness.snapshot(1, 1);
+		FileOutputStream out = new FileOutputStream(
+				"src/test/resources/cep-keyed-snapshot-1.1");
+		ObjectOutputStream oos = new ObjectOutputStream(out);
+		oos.writeObject(snapshot);
+		out.close();
+		harness.close();
+		*/
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						new KeyedCEPPatternOperator<>(
+								Event.createTypeSerializer(),
+								false,
+								keySelector,
+								IntSerializer.INSTANCE,
+								new NFAFactory(),
+								true),
+						keySelector,
+						BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-snapshot-1.1"));
+		harness.open();
+
+		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+		harness.processElement(new StreamRecord<>(endEvent, 5));
+
+		harness.processWatermark(new Watermark(20));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject = result.poll();
+		assertTrue(resultObject instanceof StreamRecord);
+		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+		assertTrue(resultRecord.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+		assertEquals(startEvent, patternMap.get("start"));
+		assertEquals(middleEvent, patternMap.get("middle"));
+		assertEquals(endEvent, patternMap.get("end"));
+
+		harness.close();
+	}
+
+	@Test
+	public void testNonKeyedCEPFunctionMigration() throws Exception {
+
+		final Event startEvent = new Event(42, "start", 1.0);
+		final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+		final Event endEvent=  new Event(42, "end", 1.0);
+
+		// uncomment these lines for regenerating the snapshot on Flink 1.1
+		/*
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
+				new CEPPatternOperator<>(
+						Event.createTypeSerializer(),
+						false,
+						new NFAFactory()));
+		harness.open();
+		harness.processElement(new StreamRecord<Event>(startEvent, 1));
+		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+		harness.processWatermark(new Watermark(2));
+
+		// simulate snapshot/restore with empty element queue but NFA state
+		StreamTaskState snapshot = harness.snapshot(1, 1);
+		FileOutputStream out = new FileOutputStream(
+				"src/test/resources/cep-non-keyed-snapshot-1.1");
+		ObjectOutputStream oos = new ObjectOutputStream(out);
+		oos.writeObject(snapshot);
+		out.close();
+		harness.close();
+		*/
+
+		NullByteKeySelector keySelector = new NullByteKeySelector();
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						new KeyedCEPPatternOperator<>(
+								Event.createTypeSerializer(),
+								false,
+								keySelector,
+								ByteSerializer.INSTANCE,
+								new NFAFactory(),
+								false),
+						keySelector,
+						BasicTypeInfo.BYTE_TYPE_INFO);
+
+		harness.setup();
+		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-snapshot-1.1"));
+		harness.open();
+
+		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+		harness.processElement(new StreamRecord<>(endEvent, 5));
+
+		harness.processWatermark(new Watermark(Long.MAX_VALUE));
+
+		ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject = result.poll();
+		assertTrue(resultObject instanceof StreamRecord);
+		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+		assertTrue(resultRecord.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+		assertEquals(startEvent, patternMap.get("start"));
+		assertEquals(middleEvent, patternMap.get("middle"));
+		assertEquals(endEvent, patternMap.get("end"));
+
+		harness.close();
+	}
+
+	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		private final boolean handleTimeout;
+
+		private NFAFactory() {
+			this(false);
+		}
+
+		private NFAFactory(boolean handleTimeout) {
+			this.handleTimeout = handleTimeout;
+		}
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+					.followedBy("middle").subtype(SubEvent.class).where(new MiddleFilter())
+					.followedBy("end").where(new EndFilter())
+					// add a window timeout to test whether timestamps of elements in the
+					// priority queue in CEP operator are correctly checkpointed/restored
+					.within(Time.milliseconds(10L));
+
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+		}
+	}
+
+	private static class StartFilter implements FilterFunction<Event> {
+		private static final long serialVersionUID = 5726188262756267490L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("start");
+		}
+	}
+
+	private static class MiddleFilter implements FilterFunction<SubEvent> {
+		private static final long serialVersionUID = 6215754202506583964L;
+
+		@Override
+		public boolean filter(SubEvent value) throws Exception {
+			return value.getVolume() > 5.0;
+		}
+	}
+
+	private static class EndFilter implements FilterFunction<Event> {
+		private static final long serialVersionUID = 7056763917392056548L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getName().equals("end");
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 4ae74b9..a99db05 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -224,7 +224,8 @@ public class CEPOperatorTest extends TestLogger {
 				false,
 				keySelector,
 				IntSerializer.INSTANCE,
-				new NFAFactory(true)),
+				new NFAFactory(true),
+				true),
 			keySelector,
 			BasicTypeInfo.INT_TYPE_INFO);
 
@@ -480,7 +481,8 @@ public class CEPOperatorTest extends TestLogger {
 			isProcessingTime,
 			keySelector,
 			IntSerializer.INSTANCE,
-			new NFAFactory());
+			new NFAFactory(),
+			true);
 	}
 
 	private static class TestKeySelector implements KeySelector<Event, Integer> {

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 78765c0..399662a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -345,7 +345,8 @@ public class CEPRescalingTest {
 				false,
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				new NFAFactory()),
+				new NFAFactory(),
+				true),
 			keySelector,
 			BasicTypeInfo.INT_TYPE_INFO,
 			maxParallelism,

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1
new file mode 100644
index 0000000..277de1d
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1
new file mode 100644
index 0000000..b5ca51e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
new file mode 100644
index 0000000..a0f5a60
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
+
+
+	private static final long serialVersionUID = 1L;
+
+	private static final int TAG_REC_WITH_TIMESTAMP = 0;
+	private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
+	private static final int TAG_WATERMARK = 2;
+
+
+	private final TypeSerializer<T> typeSerializer;
+
+
+	public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
+		if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
+			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+		}
+		this.typeSerializer = requireNonNull(serializer);
+	}
+
+	public TypeSerializer<T> getContainedTypeSerializer() {
+		return this.typeSerializer;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public MultiplexingStreamRecordSerializer<T> duplicate() {
+		TypeSerializer<T> copy = typeSerializer.duplicate();
+		return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamRecord<T> createInstance() {
+		return new StreamRecord<T>(typeSerializer.createInstance());
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public StreamElement copy(StreamElement from) {
+		// we can reuse the timestamp since Instant is immutable
+		if (from.isRecord()) {
+			StreamRecord<T> fromRecord = from.asRecord();
+			return fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
+		}
+		else if (from.isWatermark()) {
+			// is immutable
+			return from;
+		}
+		else {
+			throw new RuntimeException();
+		}
+	}
+
+	@Override
+	public StreamElement copy(StreamElement from, StreamElement reuse) {
+		if (from.isRecord() && reuse.isRecord()) {
+			StreamRecord<T> fromRecord = from.asRecord();
+			StreamRecord<T> reuseRecord = reuse.asRecord();
+
+			T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
+			fromRecord.copyTo(valueCopy, reuseRecord);
+			return reuse;
+		}
+		else if (from.isWatermark()) {
+			// is immutable
+			return from;
+		}
+		else {
+			throw new RuntimeException("Cannot copy " + from + " -> " + reuse);
+		}
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		int tag = source.readByte();
+		target.write(tag);
+
+		if (tag == TAG_REC_WITH_TIMESTAMP) {
+			// move timestamp
+			target.writeLong(source.readLong());
+			typeSerializer.copy(source, target);
+		}
+		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+			typeSerializer.copy(source, target);
+		}
+		else if (tag == TAG_WATERMARK) {
+			target.writeLong(source.readLong());
+		}
+		else {
+			throw new IOException("Corrupt stream, found tag: " + tag);
+		}
+	}
+
+	@Override
+	public void serialize(StreamElement value, DataOutputView target) throws IOException {
+		if (value.isRecord()) {
+			StreamRecord<T> record = value.asRecord();
+
+			if (record.hasTimestamp()) {
+				target.write(TAG_REC_WITH_TIMESTAMP);
+				target.writeLong(record.getTimestamp());
+			} else {
+				target.write(TAG_REC_WITHOUT_TIMESTAMP);
+			}
+			typeSerializer.serialize(record.getValue(), target);
+		}
+		else if (value.isWatermark()) {
+			target.write(TAG_WATERMARK);
+			target.writeLong(value.asWatermark().getTimestamp());
+		}
+		else {
+			throw new RuntimeException();
+		}
+	}
+
+	@Override
+	public StreamElement deserialize(DataInputView source) throws IOException {
+		int tag = source.readByte();
+		if (tag == TAG_REC_WITH_TIMESTAMP) {
+			long timestamp = source.readLong();
+			return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
+		}
+		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+			return new StreamRecord<T>(typeSerializer.deserialize(source));
+		}
+		else if (tag == TAG_WATERMARK) {
+			return new Watermark(source.readLong());
+		}
+		else {
+			throw new IOException("Corrupt stream, found tag: " + tag);
+		}
+	}
+
+	@Override
+	public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException {
+		int tag = source.readByte();
+		if (tag == TAG_REC_WITH_TIMESTAMP) {
+			long timestamp = source.readLong();
+			T value = typeSerializer.deserialize(source);
+			StreamRecord<T> reuseRecord = reuse.asRecord();
+			reuseRecord.replace(value, timestamp);
+			return reuseRecord;
+		}
+		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+			T value = typeSerializer.deserialize(source);
+			StreamRecord<T> reuseRecord = reuse.asRecord();
+			reuseRecord.replace(value);
+			return reuseRecord;
+		}
+		else if (tag == TAG_WATERMARK) {
+			return new Watermark(source.readLong());
+		}
+		else {
+			throw new IOException("Corrupt stream, found tag: " + tag);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof MultiplexingStreamRecordSerializer) {
+			MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
+
+			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof MultiplexingStreamRecordSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return typeSerializer.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
new file mode 100644
index 0000000..0235ab8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.runtime.streamrecord;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with
+ * the element.
+ *
+ * <p>
+ * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also
+ * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same
+ * stream with {@link StreamRecord StreamRecords}.
+ *
+ * @see MultiplexingStreamRecordSerializer
+ *
+ * @param <T> The type of value in the {@link StreamRecord}
+ */
+@Internal
+public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<T> typeSerializer;
+	
+
+	public StreamRecordSerializer(TypeSerializer<T> serializer) {
+		if (serializer instanceof StreamRecordSerializer) {
+			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+		}
+		this.typeSerializer = Preconditions.checkNotNull(serializer);
+	}
+
+	public TypeSerializer<T> getContainedTypeSerializer() {
+		return this.typeSerializer;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  General serializer and type utils
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamRecordSerializer<T> duplicate() {
+		TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
+		return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public int getLength() {
+		return typeSerializer.getLength();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Type serialization, copying, instantiation
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamRecord<T> createInstance() {
+		try {
+			return new StreamRecord<T>(typeSerializer.createInstance());
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
+		}
+	}
+	
+	@Override
+	public StreamRecord<T> copy(StreamRecord<T> from) {
+		return from.copy(typeSerializer.copy(from.getValue()));
+	}
+
+	@Override
+	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
+		from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse);
+		return reuse;
+	}
+
+	@Override
+	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
+		typeSerializer.serialize(value.getValue(), target);
+	}
+	
+	@Override
+	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
+		return new StreamRecord<T>(typeSerializer.deserialize(source));
+	}
+
+	@Override
+	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
+		T element = typeSerializer.deserialize(reuse.getValue(), source);
+		reuse.replace(element);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		typeSerializer.copy(source, target);
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof StreamRecordSerializer) {
+			StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj;
+
+			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StreamRecordSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return typeSerializer.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/521a53d9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 661a72d..538aa4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -911,6 +911,8 @@ under the License.
 						<exclude>flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot</exclude>
 						<exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint</exclude>
 						<exclude>flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb</exclude>
+						<exclude>flink-libraries/flink-cep/src/test/resources/cep-keyed-snapshot-1.1</exclude>
+						<exclude>flink-libraries/flink-cep/src/test/resources/cep-non-keyed-snapshot-1.1</exclude>
 
 						<!-- TweetInputFormat Test Data-->
 						<exclude>flink-contrib/flink-tweet-inputformat/src/main/resources/HashTagTweetSample.json</exclude>