You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 16:58:03 UTC

[4/5] flink git commit: [FLINK-5041] Savepoint Backwards Compatibility 1.1 -> 1.2

[FLINK-5041] Savepoint Backwards Compatibility 1.1 -> 1.2


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e95fe56b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e95fe56b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e95fe56b

Branch: refs/heads/master
Commit: e95fe56b60158fc4df511a1404f7346bd18b8f12
Parents: 8cda6a2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Nov 24 17:29:24 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBFoldingState.java    |  6 ++--
 .../org/apache/flink/core/io/Versioned.java     | 31 ++++++++++++++++++
 .../java/org/apache/flink/util/Migration.java   | 25 ++++++++++++++
 .../savepoint/SavepointV0Serializer.java        |  7 ++--
 .../runtime/state/AbstractStateBackend.java     |  6 +++-
 .../state/MigrationKeyGroupStateHandle.java     |  3 +-
 .../state/MigrationStreamStateHandle.java       | 13 ++++++--
 .../runtime/checkpoint/savepoint/Savepoint.java | 13 ++------
 .../savepoint/SavepointV1Serializer.java        | 16 ++++-----
 .../savepoint/MigrationV0ToV1Test.java          | 13 ++------
 .../runtime/state/OperatorStateBackendTest.java |  2 +-
 .../api/operators/AbstractStreamOperator.java   | 34 +++++++++++---------
 .../operators/AbstractUdfStreamOperator.java    |  8 +++++
 13 files changed, 122 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 3018f7b..9c2bf4f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
 /**
@@ -85,7 +85,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 			if (valueBytes == null) {
 				return null;
 			}
-			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+			return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
 		} catch (IOException|RocksDBException e) {
 			throw new RuntimeException("Error while retrieving data from RocksDB", e);
 		}
@@ -103,7 +103,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 				valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
 				backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 			} else {
-				ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+				ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
 				ACC newValue = foldFunction.fold(oldValue, value);
 				keySerializationStream.reset();
 				valueSerializer.serialize(newValue, out);

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
new file mode 100644
index 0000000..b36d5e8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+/**
+ * This interface is implemented by classes that provide a version number. Versions numbers can be used to differentiate
+ * between evolving classes.
+ */
+public interface Versioned {
+
+	/**
+	 * Returns the version number of the object. Versions numbers can be used to differentiate evolving classes.
+	 */
+	int getVersion();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-core/src/main/java/org/apache/flink/util/Migration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Migration.java b/flink-core/src/main/java/org/apache/flink/util/Migration.java
new file mode 100644
index 0000000..4bd9e39
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/Migration.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Tagging interface for migration related classes.
+ */
+public interface Migration {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index dc307e2..6c6a8f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
 import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
 import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.migration.runtime.state.AbstractStateBackend;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
 import org.apache.flink.migration.runtime.state.StateHandle;
 import org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
@@ -266,10 +267,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 		}
 
 		if (null != operatorState) {
-			mergeStateHandles.add(SIGNAL_1);
 			mergeStateHandles.add(convertStateHandle(operatorState));
-		} else {
-			mergeStateHandles.add(SIGNAL_0);
 		}
 
 		return new MigrationStreamStateHandle(new MultiStreamStateHandle(mergeStateHandles));
@@ -340,6 +338,9 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 			byte[] data =
 					((org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) oldStateHandle).getData();
 			return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
+		} else if (oldStateHandle instanceof AbstractStateBackend.DataInputViewHandle) {
+			return convertStateHandle(
+					((AbstractStateBackend.DataInputViewHandle) oldStateHandle).getStreamStateHandle());
 		}
 		throw new IllegalArgumentException("Unknown state handle type: " + oldStateHandle);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
index b7932f5..1d76c06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
@@ -35,7 +35,7 @@ public abstract class AbstractStateBackend implements Serializable {
 	/**
 	 * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
 	 */
-	private static final class DataInputViewHandle implements StateHandle<DataInputView> {
+	public static final class DataInputViewHandle implements StateHandle<DataInputView> {
 
 		private static final long serialVersionUID = 2891559813513532079L;
 
@@ -45,6 +45,10 @@ public abstract class AbstractStateBackend implements Serializable {
 			this.stream = stream;
 		}
 
+		public StreamStateHandle getStreamStateHandle() {
+			return stream;
+		}
+
 		@Override
 		public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
 			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
index 1bebcb6..995d234 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Migration;
 
 @Internal
 @Deprecated
@@ -29,7 +30,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
  * This class is just a KeyGroupsStateHandle that is tagged as migration, to figure out which restore logic to apply,
  * e.g. when restoring backend data from a state handle.
  */
-public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle {
+public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle implements Migration {
 
 	private static final long serialVersionUID = -8554427169776881697L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
index e2da757..e7831a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
@@ -20,7 +20,9 @@ package org.apache.flink.migration.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataInputStreamWrapper;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Migration;
 
 import java.io.IOException;
 
@@ -30,7 +32,7 @@ import java.io.IOException;
  * This class is just a StreamStateHandle that is tagged as migration, to figure out which restore logic to apply, e.g.
  * when restoring backend data from a state handle.
  */
-public class MigrationStreamStateHandle implements StreamStateHandle {
+public class MigrationStreamStateHandle implements StreamStateHandle, Migration {
 
 	private static final long serialVersionUID = -2332113722532150112L;
 	private final StreamStateHandle delegate;
@@ -41,7 +43,7 @@ public class MigrationStreamStateHandle implements StreamStateHandle {
 
 	@Override
 	public FSDataInputStream openInputStream() throws IOException {
-		return delegate.openInputStream();
+		return new MigrationFSInputStream(delegate.openInputStream());
 	}
 
 	@Override
@@ -53,4 +55,11 @@ public class MigrationStreamStateHandle implements StreamStateHandle {
 	public long getStateSize() {
 		return delegate.getStateSize();
 	}
+
+	static class MigrationFSInputStream extends FSDataInputStreamWrapper implements Migration {
+
+		public MigrationFSInputStream(FSDataInputStream inputStream) {
+			super(inputStream);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index 643f14c..baad05f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.core.io.Versioned;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TaskState;
 
@@ -34,17 +35,7 @@ import java.util.Collection;
  * <p>Savepoints are serialized via a {@link SavepointSerializer} and stored
  * via a {@link SavepointStore}.
  */
-public interface Savepoint {
-
-	/**
-	 * Returns the savepoint version.
-	 *
-	 * <p>This version is independent of the Flink version, e.g. multiple Flink
-	 * versions can work the same savepoint version.
-	 *
-	 * @return Savepoint version
-	 */
-	int getVersion();
+public interface Savepoint extends Versioned {
 
 	/**
 	 * Returns the checkpoint ID of the savepoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index cd3e87f..4d16c13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -211,7 +211,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 				duration);
 	}
 
-	public static void serializeKeyGroupStateHandle(
+	private static void serializeKeyGroupStateHandle(
 			KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle != null) {
@@ -227,7 +227,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		}
 	}
 
-	public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
+	private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 			return null;
@@ -247,7 +247,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		}
 	}
 
-	public static void serializeOperatorStateHandle(
+	private static void serializeOperatorStateHandle(
 			OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle != null) {
@@ -258,8 +258,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 				dos.writeUTF(entry.getKey());
 				long[] offsets = entry.getValue();
 				dos.writeInt(offsets.length);
-				for (int i = 0; i < offsets.length; ++i) {
-					dos.writeLong(offsets[i]);
+				for (long offset : offsets) {
+					dos.writeLong(offset);
 				}
 			}
 			serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
@@ -268,7 +268,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		}
 	}
 
-	public static OperatorStateHandle deserializeOperatorStateHandle(
+	private static OperatorStateHandle deserializeOperatorStateHandle(
 			DataInputStream dis) throws IOException {
 
 		final int type = dis.readByte();
@@ -292,7 +292,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		}
 	}
 
-	public static void serializeStreamStateHandle(
+	private static void serializeStreamStateHandle(
 			StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle == null) {
@@ -319,7 +319,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		dos.flush();
 	}
 
-	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+	private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
 		int type = dis.read();
 		if (NULL_HANDLE == type) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
index 02365c7..4208fe5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -147,13 +147,8 @@ public class MigrationV0ToV1Test {
 
 							//check operator state
 							expTestState.f0 = 1;
-							if (p % 3 != 0) {
-								assertEquals(1, is.read());
-								actTestState = InstantiationUtil.deserializeObject(is, cl);
-								assertEquals(expTestState, actTestState);
-							} else {
-								assertEquals(0, is.read());
-							}
+							actTestState = InstantiationUtil.deserializeObject(is, cl);
+							assertEquals(expTestState, actTestState);
 						}
 					}
 
@@ -210,9 +205,7 @@ public class MigrationV0ToV1Test {
 						state.setFunctionState(new SerializedStateHandle<Serializable>(testState));
 					}
 					testState = new Tuple4<>(1, i, j, k);
-					if (j % 3 != 0) {
-						state.setOperatorState(new SerializedStateHandle<>(testState));
-					}
+					state.setOperatorState(new SerializedStateHandle<>(testState));
 
 					if ((0 == k) && (i % 3 != 0)) {
 						HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 2db8735..51e3739 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -160,4 +160,4 @@ public class OperatorStateBackendTest {
 		}
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 839abf8..f9b711e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,11 +28,11 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
@@ -58,10 +58,10 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -201,7 +201,6 @@ public abstract class AbstractStreamOperator<OUT>
 
 		if (restoring) {
 
-			// TODO check that there is EITHER old OR new state in handles!
 			restoreStreamCheckpointed(stateHandles);
 
 			//pass directly
@@ -230,18 +229,23 @@ public abstract class AbstractStreamOperator<OUT>
 	@Deprecated
 	private void restoreStreamCheckpointed(OperatorStateHandles stateHandles) throws Exception {
 		StreamStateHandle state = stateHandles.getLegacyOperatorState();
-		if (this instanceof StreamCheckpointedOperator && null != state) {
+		if (null != state) {
+			if (this instanceof StreamCheckpointedOperator) {
 
-			LOG.debug("Restore state of task {} in chain ({}).",
-					stateHandles.getOperatorChainIndex(), getContainingTask().getName());
+				LOG.debug("Restore state of task {} in chain ({}).",
+						stateHandles.getOperatorChainIndex(), getContainingTask().getName());
 
-			FSDataInputStream is = state.openInputStream();
-			try {
-				getContainingTask().getCancelables().registerClosable(is);
-				((StreamCheckpointedOperator) this).restoreState(is);
-			} finally {
-				getContainingTask().getCancelables().unregisterClosable(is);
-				is.close();
+				FSDataInputStream is = state.openInputStream();
+				try {
+					getContainingTask().getCancelables().registerClosable(is);
+					((StreamCheckpointedOperator) this).restoreState(is);
+				} finally {
+					getContainingTask().getCancelables().unregisterClosable(is);
+					is.close();
+				}
+			} else {
+				throw new Exception(
+						"Found legacy operator state for operator that does not implement StreamCheckpointedOperator.");
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index c1f783f..1404958 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Migration;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -192,6 +193,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 			} catch (Exception e) {
 				throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
 			}
+		} else if (userFunction instanceof CheckpointedRestoring) {
+			out.write(0);
 		}
 	}
 
@@ -213,6 +216,11 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 					}
 				}
 			}
+		} else if (in instanceof Migration) {
+			int hasUdfState = in.read();
+			if (hasUdfState == 1) {
+				throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring");
+			}
 		}
 	}