You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/08/24 18:22:38 UTC

[07/11] flink git commit: [FLINK-7461] Remove Backwards compatibility with <= Flink 1.1

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
deleted file mode 100644
index 16f3769..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
-import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
-import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.memory.MemValueState;
-import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
-import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
-import org.apache.flink.migration.util.MigrationInstantiationUtil;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("deprecation")
-public class MigrationV0ToV1Test {
-
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
-
-	/**
-	 * Simple test of savepoint methods.
-	 */
-	@Test
-	public void testSavepointMigrationV0ToV1() throws Exception {
-
-		String target = tmp.getRoot().getAbsolutePath();
-
-		assertEquals(0, tmp.getRoot().listFiles().length);
-
-		long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
-		int numTaskStates = 4;
-		int numSubtaskStates = 16;
-
-		Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> expected =
-				createTaskStatesOld(numTaskStates, numSubtaskStates);
-
-		SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
-
-		assertEquals(SavepointV0.VERSION, savepoint.getVersion());
-		assertEquals(checkpointId, savepoint.getCheckpointId());
-		assertEquals(expected, savepoint.getOldTaskStates());
-
-		assertFalse(savepoint.getOldTaskStates().isEmpty());
-
-		Exception latestException = null;
-		Path path = null;
-		FSDataOutputStream fdos = null;
-
-		FileSystem fs = null;
-
-		try {
-
-			// Try to create a FS output stream
-			for (int attempt = 0; attempt < 10; attempt++) {
-				path = new Path(target, FileUtils.getRandomFilename("savepoint-"));
-
-				if (fs == null) {
-					fs = FileSystem.get(path.toUri());
-				}
-
-				try {
-					fdos = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
-					break;
-				} catch (Exception e) {
-					latestException = e;
-				}
-			}
-
-			if (fdos == null) {
-				throw new IOException("Failed to create file output stream at " + path, latestException);
-			}
-
-			try (DataOutputStream dos = new DataOutputStream(fdos)) {
-				dos.writeInt(SavepointStore.MAGIC_NUMBER);
-				dos.writeInt(savepoint.getVersion());
-				SavepointV0Serializer.INSTANCE.serializeOld(savepoint, dos);
-			}
-
-			ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
-			Savepoint sp = SavepointStore.loadSavepoint(path.toString(), cl);
-			int t = 0;
-			for (TaskState taskState : sp.getTaskStates()) {
-				for (int p = 0; p < taskState.getParallelism(); ++p) {
-					SubtaskState subtaskState = taskState.getState(p);
-					ChainedStateHandle<StreamStateHandle> legacyOperatorState = subtaskState.getLegacyOperatorState();
-					for (int c = 0; c < legacyOperatorState.getLength(); ++c) {
-						StreamStateHandle stateHandle = legacyOperatorState.get(c);
-						try (InputStream is = stateHandle.openInputStream()) {
-							Tuple4<Integer, Integer, Integer, Integer> expTestState = new Tuple4<>(0, t, p, c);
-							Tuple4<Integer, Integer, Integer, Integer> actTestState;
-							//check function state
-							if (p % 4 != 0) {
-								assertEquals(1, is.read());
-								actTestState = InstantiationUtil.deserializeObject(is, cl);
-								assertEquals(expTestState, actTestState);
-							} else {
-								assertEquals(0, is.read());
-							}
-
-							//check operator state
-							expTestState.f0 = 1;
-							actTestState = InstantiationUtil.deserializeObject(is, cl);
-							assertEquals(expTestState, actTestState);
-						}
-					}
-
-					//check keyed state
-					KeyedStateHandle keyedStateHandle = subtaskState.getManagedKeyedState();
-
-					if (t % 3 != 0) {
-
-						assertTrue(keyedStateHandle instanceof KeyGroupsStateHandle);
-
-						KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
-
-						assertEquals(1, keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
-						assertEquals(p, keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
-
-						ByteStreamStateHandle stateHandle =
-								(ByteStreamStateHandle) keyGroupsStateHandle.getDelegateStateHandle();
-						HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState =
-								MigrationInstantiationUtil.deserializeObject(stateHandle.getData(), cl);
-
-						assertEquals(2, testKeyedState.size());
-						for (KvStateSnapshot<?, ?, ?, ?> snapshot : testKeyedState.values()) {
-							MemValueState.Snapshot<?, ?, ?> castedSnapshot = (MemValueState.Snapshot<?, ?, ?>) snapshot;
-							byte[] data = castedSnapshot.getData();
-							assertEquals(t, data[0]);
-							assertEquals(p, data[1]);
-						}
-					} else {
-						assertEquals(null, keyedStateHandle);
-					}
-				}
-
-				++t;
-			}
-
-			savepoint.dispose();
-
-		} finally {
-			// Dispose
-			SavepointStore.removeSavepointFile(path.toString());
-		}
-	}
-
-	private static Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> createTaskStatesOld(
-			int numTaskStates, int numSubtaskStates) throws Exception {
-
-		List<org.apache.flink.migration.runtime.checkpoint.TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-		for (int i = 0; i < numTaskStates; i++) {
-			org.apache.flink.migration.runtime.checkpoint.TaskState taskState =
-					new org.apache.flink.migration.runtime.checkpoint.TaskState(new JobVertexID(), numSubtaskStates);
-			for (int j = 0; j < numSubtaskStates; j++) {
-
-				StreamTaskState[] streamTaskStates = new StreamTaskState[2];
-
-				for (int k = 0; k < streamTaskStates.length; k++) {
-					StreamTaskState state = new StreamTaskState();
-					Tuple4<Integer, Integer, Integer, Integer> testState = new Tuple4<>(0, i, j, k);
-					if (j % 4 != 0) {
-						state.setFunctionState(new SerializedStateHandle<Serializable>(testState));
-					}
-					testState = new Tuple4<>(1, i, j, k);
-					state.setOperatorState(new SerializedStateHandle<>(testState));
-
-					if ((0 == k) && (i % 3 != 0)) {
-						HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);
-						for (int l = 0; l < 2; ++l) {
-							String name = "keyed-" + l;
-							KvStateSnapshot<?, ?, ?, ?> testKeyedSnapshot =
-									new MemValueState.Snapshot<>(
-											IntSerializer.INSTANCE,
-											VoidNamespaceSerializer.INSTANCE,
-											IntSerializer.INSTANCE,
-											new ValueStateDescriptor<>(name, Integer.class, 0),
-											new byte[]{(byte) i, (byte) j});
-							testKeyedState.put(name, testKeyedSnapshot);
-						}
-						state.setKvStates(testKeyedState);
-					}
-					streamTaskStates[k] = state;
-				}
-
-				StreamTaskStateList streamTaskStateList = new StreamTaskStateList(streamTaskStates);
-				org.apache.flink.migration.util.SerializedValue<
-						org.apache.flink.migration.runtime.state.StateHandle<?>> handle =
-						new org.apache.flink.migration.util.SerializedValue<
-								org.apache.flink.migration.runtime.state.StateHandle<?>>(streamTaskStateList);
-
-				taskState.putState(j, new org.apache.flink.migration.runtime.checkpoint.SubtaskState(handle, 0, 0));
-			}
-
-			taskStates.add(taskState);
-		}
-
-		return taskStates;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 933c7a0..173730a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -72,6 +73,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
@@ -98,6 +100,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
@@ -560,7 +563,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
 			TaskStateSnapshot taskStateHandles) throws Exception {
 			int subtaskIndex = getIndexInSubtaskGroup();
 			if (subtaskIndex < recoveredStates.length) {
-				try (FSDataInputStream in = taskStateHandles.getSubtaskStateMappings().iterator().next().getValue().getLegacyOperatorState().openInputStream()) {
+				OperatorStateHandle operatorStateHandle = extractSingletonOperatorState(taskStateHandles);
+				try (FSDataInputStream in = operatorStateHandle.openInputStream()) {
 					recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(in, getUserCodeClassLoader());
 				}
 			}
@@ -572,11 +576,21 @@ public class JobManagerHARecoveryTest extends TestLogger {
 					String.valueOf(UUID.randomUUID()),
 					InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
 
+			Map<String, OperatorStateHandle.StateMetaInfo> stateNameToPartitionOffsets = new HashMap<>(1);
+			stateNameToPartitionOffsets.put(
+				"test-state",
+				new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+
+			OperatorStateHandle operatorStateHandle = new OperatorStateHandle(stateNameToPartitionOffsets, byteStreamStateHandle);
+
 			TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
 			checkpointStateHandles.putSubtaskStateByOperatorID(
 				OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
-				new OperatorSubtaskState(byteStreamStateHandle)
-			);
+				new OperatorSubtaskState(
+					Collections.singletonList(operatorStateHandle),
+					Collections.emptyList(),
+					Collections.emptyList(),
+					Collections.emptyList()));
 
 			getEnvironment().acknowledgeCheckpoint(
 					checkpointMetaData.getCheckpointId(),
@@ -614,5 +628,17 @@ public class JobManagerHARecoveryTest extends TestLogger {
 		public static long[] getRecoveredStates() {
 			return recoveredStates;
 		}
+
+		private static OperatorStateHandle extractSingletonOperatorState(TaskStateSnapshot taskStateHandles) {
+			Set<Map.Entry<OperatorID, OperatorSubtaskState>> subtaskStateMappings = taskStateHandles.getSubtaskStateMappings();
+			Preconditions.checkNotNull(subtaskStateMappings);
+			Preconditions.checkState(subtaskStateMappings.size()  == 1);
+			OperatorSubtaskState subtaskState = subtaskStateMappings.iterator().next().getValue();
+			Collection<OperatorStateHandle> managedOperatorState =
+				Preconditions.checkNotNull(subtaskState).getManagedOperatorState();
+			Preconditions.checkNotNull(managedOperatorState);
+			Preconditions.checkState(managedOperatorState.size()  == 1);
+			return managedOperatorState.iterator().next();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index d022cdc..b36ac86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -75,7 +75,6 @@ public class CheckpointMessagesTest {
 			checkpointStateHandles.putSubtaskStateByOperatorID(
 				new OperatorID(),
 				new OperatorSubtaskState(
-					CheckpointCoordinatorTest.generateStreamStateHandle(new MyHandle()),
 					CheckpointCoordinatorTest.generatePartitionableStateHandle(new JobVertexID(), 0, 2, 8, false),
 					null,
 					CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, Collections.singletonList(new MyHandle())),

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
deleted file mode 100644
index dd6148c..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.api.graph;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphHasher;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-
-import org.apache.flink.shaded.guava18.com.google.common.hash.HashFunction;
-import org.apache.flink.shaded.guava18.com.google.common.hash.Hasher;
-import org.apache.flink.shaded.guava18.com.google.common.hash.Hashing;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-
-import static org.apache.flink.util.StringUtils.byteToHexString;
-
-/**
- * StreamGraphHasher from Flink 1.1. This contains duplicated code to ensure that the algorithm does not change with
- * future Flink versions.
- *
- * <p>DO NOT MODIFY THIS CLASS
- */
-public class StreamGraphHasherV1 implements StreamGraphHasher {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV1.class);
-
-	@Override
-	public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
-		// The hash function used to generate the hash
-		final HashFunction hashFunction = Hashing.murmur3_128(0);
-		final Map<Integer, byte[]> hashes = new HashMap<>();
-
-		Set<Integer> visited = new HashSet<>();
-		Queue<StreamNode> remaining = new ArrayDeque<>();
-
-		// We need to make the source order deterministic. The source IDs are
-		// not returned in the same order, which means that submitting the same
-		// program twice might result in different traversal, which breaks the
-		// deterministic hash assignment.
-		List<Integer> sources = new ArrayList<>();
-		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
-			sources.add(sourceNodeId);
-		}
-		Collections.sort(sources);
-
-		//
-		// Traverse the graph in a breadth-first manner. Keep in mind that
-		// the graph is not a tree and multiple paths to nodes can exist.
-		//
-
-		// Start with source nodes
-		for (Integer sourceNodeId : sources) {
-			remaining.add(streamGraph.getStreamNode(sourceNodeId));
-			visited.add(sourceNodeId);
-		}
-
-		StreamNode currentNode;
-		while ((currentNode = remaining.poll()) != null) {
-			// Generate the hash code. Because multiple path exist to each
-			// node, we might not have all required inputs available to
-			// generate the hash code.
-			if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
-				// Add the child nodes
-				for (StreamEdge outEdge : currentNode.getOutEdges()) {
-					StreamNode child = outEdge.getTargetVertex();
-
-					if (!visited.contains(child.getId())) {
-						remaining.add(child);
-						visited.add(child.getId());
-					}
-				}
-			} else {
-				// We will revisit this later.
-				visited.remove(currentNode.getId());
-			}
-		}
-
-		return hashes;
-	}
-
-	/**
-	 * Generates a hash for the node and returns whether the operation was
-	 * successful.
-	 *
-	 * @param node         The node to generate the hash for
-	 * @param hashFunction The hash function to use
-	 * @param hashes       The current state of generated hashes
-	 * @return <code>true</code> if the node hash has been generated.
-	 * <code>false</code>, otherwise. If the operation is not successful, the
-	 * hash needs be generated at a later point when all input is available.
-	 * @throws IllegalStateException If node has user-specified hash and is
-	 *                               intermediate node of a chain
-	 */
-	private boolean generateNodeHash(
-			StreamNode node,
-			HashFunction hashFunction,
-			Map<Integer, byte[]> hashes,
-			boolean isChainingEnabled) {
-
-		// Check for user-specified ID
-		String userSpecifiedHash = node.getTransformationUID();
-
-		if (userSpecifiedHash == null) {
-			// Check that all input nodes have their hashes computed
-			for (StreamEdge inEdge : node.getInEdges()) {
-				// If the input node has not been visited yet, the current
-				// node will be visited again at a later point when all input
-				// nodes have been visited and their hashes set.
-				if (!hashes.containsKey(inEdge.getSourceId())) {
-					return false;
-				}
-			}
-
-			Hasher hasher = hashFunction.newHasher();
-			byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
-
-			if (hashes.put(node.getId(), hash) != null) {
-				// Sanity check
-				throw new IllegalStateException("Unexpected state. Tried to add node hash " +
-						"twice. This is probably a bug in the JobGraph generator.");
-			}
-
-			return true;
-		} else {
-			Hasher hasher = hashFunction.newHasher();
-			byte[] hash = generateUserSpecifiedHash(node, hasher);
-
-			for (byte[] previousHash : hashes.values()) {
-				if (Arrays.equals(previousHash, hash)) {
-					throw new IllegalArgumentException("Hash collision on user-specified ID. " +
-							"Most likely cause is a non-unique ID. Please check that all IDs " +
-							"specified via `uid(String)` are unique.");
-				}
-			}
-
-			if (hashes.put(node.getId(), hash) != null) {
-				// Sanity check
-				throw new IllegalStateException("Unexpected state. Tried to add node hash " +
-						"twice. This is probably a bug in the JobGraph generator.");
-			}
-
-			return true;
-		}
-	}
-
-	/**
-	 * Generates a hash from a user-specified ID.
-	 */
-	private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
-		hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));
-
-		return hasher.hash().asBytes();
-	}
-
-	/**
-	 * Generates a deterministic hash from node-local properties and input and
-	 * output edges.
-	 */
-	private byte[] generateDeterministicHash(
-			StreamNode node,
-			Hasher hasher,
-			Map<Integer, byte[]> hashes,
-			boolean isChainingEnabled) {
-
-		// Include stream node to hash. We use the current size of the computed
-		// hashes as the ID. We cannot use the node's ID, because it is
-		// assigned from a static counter. This will result in two identical
-		// programs having different hashes.
-		generateNodeLocalHash(node, hasher, hashes.size());
-
-		// Include chained nodes to hash
-		for (StreamEdge outEdge : node.getOutEdges()) {
-			if (isChainable(outEdge, isChainingEnabled)) {
-				StreamNode chainedNode = outEdge.getTargetVertex();
-
-				// Use the hash size again, because the nodes are chained to
-				// this node. This does not add a hash for the chained nodes.
-				generateNodeLocalHash(chainedNode, hasher, hashes.size());
-			}
-		}
-
-		byte[] hash = hasher.hash().asBytes();
-
-		// Make sure that all input nodes have their hash set before entering
-		// this loop (calling this method).
-		for (StreamEdge inEdge : node.getInEdges()) {
-			byte[] otherHash = hashes.get(inEdge.getSourceId());
-
-			// Sanity check
-			if (otherHash == null) {
-				throw new IllegalStateException("Missing hash for input node "
-						+ inEdge.getSourceVertex() + ". Cannot generate hash for "
-						+ node + ".");
-			}
-
-			for (int j = 0; j < hash.length; j++) {
-				hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
-			}
-		}
-
-		if (LOG.isDebugEnabled()) {
-			String udfClassName = "";
-			if (node.getOperator() instanceof AbstractUdfStreamOperator) {
-				udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
-						.getUserFunction().getClass().getName();
-			}
-
-			LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
-					"'" + node.toString() + "' {id: " + node.getId() + ", " +
-					"parallelism: " + node.getParallelism() + ", " +
-					"user function: " + udfClassName + "}");
-		}
-
-		return hash;
-	}
-
-	private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
-		StreamNode upStreamVertex = edge.getSourceVertex();
-		StreamNode downStreamVertex = edge.getTargetVertex();
-
-		StreamOperator<?> headOperator = upStreamVertex.getOperator();
-		StreamOperator<?> outOperator = downStreamVertex.getOperator();
-
-		return downStreamVertex.getInEdges().size() == 1
-				&& outOperator != null
-				&& headOperator != null
-				&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
-				&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
-				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
-				headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
-				&& (edge.getPartitioner() instanceof ForwardPartitioner)
-				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
-				&& isChainingEnabled;
-	}
-
-	private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
-		hasher.putInt(id);
-
-		hasher.putInt(node.getParallelism());
-
-		if (node.getOperator() instanceof AbstractUdfStreamOperator) {
-			String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
-					.getUserFunction().getClass().getName();
-
-			hasher.putString(udfClassName, Charset.forName("UTF-8"));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
deleted file mode 100644
index b1471b2..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.runtime.streamrecord;
-
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Legacy multiplexing {@link TypeSerializer} for stream records, watermarks and other stream
- * elements.
- */
-public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
-
-
-	private static final long serialVersionUID = 1L;
-
-	private static final int TAG_REC_WITH_TIMESTAMP = 0;
-	private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
-	private static final int TAG_WATERMARK = 2;
-
-
-	private final TypeSerializer<T> typeSerializer;
-
-	public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
-		if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
-			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
-		}
-		this.typeSerializer = requireNonNull(serializer);
-	}
-
-	public TypeSerializer<T> getContainedTypeSerializer() {
-		return this.typeSerializer;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public MultiplexingStreamRecordSerializer<T> duplicate() {
-		TypeSerializer<T> copy = typeSerializer.duplicate();
-		return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamRecord<T> createInstance() {
-		return new StreamRecord<T>(typeSerializer.createInstance());
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public StreamElement copy(StreamElement from) {
-		// we can reuse the timestamp since Instant is immutable
-		if (from.isRecord()) {
-			StreamRecord<T> fromRecord = from.asRecord();
-			return fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
-		}
-		else if (from.isWatermark()) {
-			// is immutable
-			return from;
-		}
-		else {
-			throw new RuntimeException();
-		}
-	}
-
-	@Override
-	public StreamElement copy(StreamElement from, StreamElement reuse) {
-		if (from.isRecord() && reuse.isRecord()) {
-			StreamRecord<T> fromRecord = from.asRecord();
-			StreamRecord<T> reuseRecord = reuse.asRecord();
-
-			T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
-			fromRecord.copyTo(valueCopy, reuseRecord);
-			return reuse;
-		}
-		else if (from.isWatermark()) {
-			// is immutable
-			return from;
-		}
-		else {
-			throw new RuntimeException("Cannot copy " + from + " -> " + reuse);
-		}
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		int tag = source.readByte();
-		target.write(tag);
-
-		if (tag == TAG_REC_WITH_TIMESTAMP) {
-			// move timestamp
-			target.writeLong(source.readLong());
-			typeSerializer.copy(source, target);
-		}
-		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-			typeSerializer.copy(source, target);
-		}
-		else if (tag == TAG_WATERMARK) {
-			target.writeLong(source.readLong());
-		}
-		else {
-			throw new IOException("Corrupt stream, found tag: " + tag);
-		}
-	}
-
-	@Override
-	public void serialize(StreamElement value, DataOutputView target) throws IOException {
-		if (value.isRecord()) {
-			StreamRecord<T> record = value.asRecord();
-
-			if (record.hasTimestamp()) {
-				target.write(TAG_REC_WITH_TIMESTAMP);
-				target.writeLong(record.getTimestamp());
-			} else {
-				target.write(TAG_REC_WITHOUT_TIMESTAMP);
-			}
-			typeSerializer.serialize(record.getValue(), target);
-		}
-		else if (value.isWatermark()) {
-			target.write(TAG_WATERMARK);
-			target.writeLong(value.asWatermark().getTimestamp());
-		}
-		else {
-			throw new RuntimeException();
-		}
-	}
-
-	@Override
-	public StreamElement deserialize(DataInputView source) throws IOException {
-		int tag = source.readByte();
-		if (tag == TAG_REC_WITH_TIMESTAMP) {
-			long timestamp = source.readLong();
-			return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
-		}
-		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-			return new StreamRecord<T>(typeSerializer.deserialize(source));
-		}
-		else if (tag == TAG_WATERMARK) {
-			return new Watermark(source.readLong());
-		}
-		else {
-			throw new IOException("Corrupt stream, found tag: " + tag);
-		}
-	}
-
-	@Override
-	public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException {
-		int tag = source.readByte();
-		if (tag == TAG_REC_WITH_TIMESTAMP) {
-			long timestamp = source.readLong();
-			T value = typeSerializer.deserialize(source);
-			StreamRecord<T> reuseRecord = reuse.asRecord();
-			reuseRecord.replace(value, timestamp);
-			return reuseRecord;
-		}
-		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-			T value = typeSerializer.deserialize(source);
-			StreamRecord<T> reuseRecord = reuse.asRecord();
-			reuseRecord.replace(value);
-			return reuseRecord;
-		}
-		else if (tag == TAG_WATERMARK) {
-			return new Watermark(source.readLong());
-		}
-		else {
-			throw new IOException("Corrupt stream, found tag: " + tag);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Serializer configuration snapshotting & compatibility
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public MultiplexingStreamRecordSerializerConfigSnapshot snapshotConfiguration() {
-		return new MultiplexingStreamRecordSerializerConfigSnapshot<>(typeSerializer);
-	}
-
-	@Override
-	public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		if (configSnapshot instanceof MultiplexingStreamRecordSerializerConfigSnapshot) {
-			Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig =
-				((MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
-			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousTypeSerializerAndConfig.f0,
-					UnloadableDummyTypeSerializer.class,
-					previousTypeSerializerAndConfig.f1,
-					typeSerializer);
-
-			if (!compatResult.isRequiresMigration()) {
-				return CompatibilityResult.compatible();
-			} else if (compatResult.getConvertDeserializer() != null) {
-				return CompatibilityResult.requiresMigration(
-					new MultiplexingStreamRecordSerializer<>(
-						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
-			}
-		}
-
-		return CompatibilityResult.requiresMigration();
-	}
-
-	/**
-	 * Configuration snapshot specific to the {@link MultiplexingStreamRecordSerializer}.
-	 */
-	public static final class MultiplexingStreamRecordSerializerConfigSnapshot<T>
-			extends CompositeTypeSerializerConfigSnapshot {
-
-		private static final int VERSION = 1;
-
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		public MultiplexingStreamRecordSerializerConfigSnapshot() {}
-
-		public MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) {
-			super(typeSerializer);
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof MultiplexingStreamRecordSerializer) {
-			MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
-
-			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof MultiplexingStreamRecordSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return typeSerializer.hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
deleted file mode 100644
index e018ba0..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.runtime.streamrecord;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with
- * the element.
- *
- * <p>{@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also
- * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same
- * stream with {@link StreamRecord StreamRecords}.
- *
- * @see MultiplexingStreamRecordSerializer
- *
- * @param <T> The type of value in the {@link StreamRecord}
- */
-@Internal
-public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final TypeSerializer<T> typeSerializer;
-
-	public StreamRecordSerializer(TypeSerializer<T> serializer) {
-		if (serializer instanceof StreamRecordSerializer) {
-			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
-		}
-		this.typeSerializer = Preconditions.checkNotNull(serializer);
-	}
-
-	public TypeSerializer<T> getContainedTypeSerializer() {
-		return this.typeSerializer;
-	}
-
-	// ------------------------------------------------------------------------
-	//  General serializer and type utils
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamRecordSerializer<T> duplicate() {
-		TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
-		return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
-	}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public int getLength() {
-		return typeSerializer.getLength();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Type serialization, copying, instantiation
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamRecord<T> createInstance() {
-		try {
-			return new StreamRecord<T>(typeSerializer.createInstance());
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
-		}
-	}
-
-	@Override
-	public StreamRecord<T> copy(StreamRecord<T> from) {
-		return from.copy(typeSerializer.copy(from.getValue()));
-	}
-
-	@Override
-	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
-		from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse);
-		return reuse;
-	}
-
-	@Override
-	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
-		typeSerializer.serialize(value.getValue(), target);
-	}
-
-	@Override
-	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
-		return new StreamRecord<T>(typeSerializer.deserialize(source));
-	}
-
-	@Override
-	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
-		T element = typeSerializer.deserialize(reuse.getValue(), source);
-		reuse.replace(element);
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		typeSerializer.copy(source, target);
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof StreamRecordSerializer) {
-			StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj;
-
-			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof StreamRecordSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return typeSerializer.hashCode();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Serializer configuration snapshotting & compatibility
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public StreamRecordSerializerConfigSnapshot snapshotConfiguration() {
-		return new StreamRecordSerializerConfigSnapshot<>(typeSerializer);
-	}
-
-	@Override
-	public CompatibilityResult<StreamRecord<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		if (configSnapshot instanceof StreamRecordSerializerConfigSnapshot) {
-			Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig =
-				((StreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
-			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousTypeSerializerAndConfig.f0,
-					UnloadableDummyTypeSerializer.class,
-					previousTypeSerializerAndConfig.f1,
-					typeSerializer);
-
-			if (!compatResult.isRequiresMigration()) {
-				return CompatibilityResult.compatible();
-			} else if (compatResult.getConvertDeserializer() != null) {
-				return CompatibilityResult.requiresMigration(
-					new StreamRecordSerializer<>(
-						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
-			}
-		}
-
-		return CompatibilityResult.requiresMigration();
-	}
-
-	/**
-	 * Configuration snapshot specific to the {@link StreamRecordSerializer}.
-	 */
-	public static final class StreamRecordSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
-
-		private static final int VERSION = 1;
-
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		public StreamRecordSerializerConfigSnapshot() {}
-
-		public StreamRecordSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) {
-			super(typeSerializer);
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
deleted file mode 100644
index cb3c7cc..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This method must be implemented by functions that have state that needs to be
- * checkpointed. The functions get a call whenever a checkpoint should take place
- * and return a snapshot of their state, which will be checkpointed.
- *
- * <h1>Deprecation and Replacement</h1>
- * The short cut replacement for this interface is via {@link ListCheckpointed} and works
- * as shown in the example below. The {@code ListCheckpointed} interface returns a list of
- * elements (
- *
- * <p><pre>{@code
- * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> {
- *
- *     private int count;
- *
- *     public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- *         return Collections.singletonList(this.count);
- *     }
- *
- *     public void restoreState(List<Integer> state) throws Exception {
- *         this.value = state.isEmpty() ? 0 : state.get(0);
- *     }
- *
- *     public T map(T value) {
- *         count++;
- *         return value;
- *     }
- * }
- * }</pre>
- *
- * @param <T> The type of the operator state.
- *
- * @deprecated Please use {@link ListCheckpointed} as illustrated above, or
- *             {@link CheckpointedFunction} for more control over the checkpointing process.
- */
-@Deprecated
-@PublicEvolving
-public interface Checkpointed<T extends Serializable> extends CheckpointedRestoring<T> {
-
-	/**
-	 * Gets the current state of the function of operator. The state must reflect the result of all
-	 * prior invocations to this function.
-	 *
-	 * @param checkpointId The ID of the checkpoint.
-	 * @param checkpointTimestamp The timestamp of the checkpoint, as derived by
-	 *                            System.currentTimeMillis() on the JobManager.
-	 *
-	 * @return A snapshot of the operator state.
-	 *
-	 * @throws Exception Thrown if the creation of the state object failed. This causes the
-	 *                   checkpoint to fail. The system may decide to fail the operation (and trigger
-	 *                   recovery), or to discard this checkpoint attempt and to continue running
-	 *                   and to try again with the next checkpoint attempt.
-	 */
-	T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
deleted file mode 100644
index 5138b49..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This interface marks a function/operator as checkpointed similar to the
- * {@link Checkpointed} interface, but gives the Flink framework the option to
- * perform the checkpoint asynchronously. Note that asynchronous checkpointing for
- * this interface has not been implemented.
- *
- * <h1>Deprecation and Replacement</h1>
- * The shortcut replacement for this interface is via {@link ListCheckpointed} and works
- * as shown in the example below. Please refer to the JavaDocs of {@link ListCheckpointed} for
- * a more detailed description of how to use the new interface.
- *
- * <p><pre>{@code
- * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> {
- *
- *     private int count;
- *
- *     public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- *         return Collections.singletonList(this.count);
- *     }
- *
- *     public void restoreState(List<Integer> state) throws Exception {
- *         this.value = state.isEmpty() ? 0 : state.get(0);
- *     }
- *
- *     public T map(T value) {
- *         count++;
- *         return value;
- *     }
- * }
- * }</pre>
- *
- * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction} instead,
- *             as illustrated in the example above.
- */
-@Deprecated
-@PublicEvolving
-public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
deleted file mode 100644
index cfaa505..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This deprecated interface contains the methods for restoring from the legacy checkpointing mechanism of state.
- * @param <T> type of the restored state.
- *
- * @deprecated Please use {@link CheckpointedFunction} or {@link ListCheckpointed} after restoring your legacy state.
- */
-@Deprecated
-@PublicEvolving
-public interface CheckpointedRestoring<T extends Serializable> {
-	/**
-	 * Restores the state of the function or operator to that of a previous checkpoint.
-	 * This method is invoked when a function is executed as part of a recovery run.
-	 *
-	 * <p>Note that restoreState() is called before open().
-	 *
-	 * @param state The state to be restored.
-	 */
-	void restoreState(T state) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
deleted file mode 100644
index bb6e4bc..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-/**
- * For specifying what type of window operator was used to create the state
- * that a {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}
- * is restoring from. This is used to signal that state written using an aligned processing-time
- * window operator should be restored.
- */
-public enum LegacyWindowOperatorType {
-
-	FAST_ACCUMULATING(true, false),
-
-	FAST_AGGREGATING(false, true),
-
-	NONE(false, false);
-
-	// ------------------------------------------------------------------------
-
-	private final boolean fastAccumulating;
-	private final boolean fastAggregating;
-
-	LegacyWindowOperatorType(boolean fastAccumulating, boolean fastAggregating) {
-		this.fastAccumulating = fastAccumulating;
-		this.fastAggregating = fastAggregating;
-	}
-
-	public boolean isFastAccumulating() {
-		return fastAccumulating;
-	}
-
-	public boolean isFastAggregating() {
-		return fastAggregating;
-	}
-
-	@Override
-	public String toString() {
-		if (fastAccumulating) {
-			return "AccumulatingProcessingTimeWindowOperator";
-		} else if (fastAggregating) {
-			return "AggregatingProcessingTimeWindowOperator";
-		} else {
-			return "WindowOperator";
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 348861f..f904a10 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -50,19 +49,11 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingAlignedProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
@@ -227,33 +218,7 @@ public class WindowedStream<T, K, W extends Window> {
 
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowedStream." + callLocation;
-
-		SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
-
-		LegacyWindowOperatorType legacyOpType = getLegacyWindowType(function);
-		return reduce(function, new PassThroughWindowFunction<K, W, T>(), legacyOpType);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * <p>Arriving data is incrementally aggregated using the given reducer.
-	 *
-	 * @param reduceFunction The reduce function that is used for incremental aggregation.
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	@PublicEvolving
-	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
-		return reduce(reduceFunction, function, LegacyWindowOperatorType.NONE);
+		return reduce(function, new PassThroughWindowFunction<K, W, T>());
 	}
 
 	/**
@@ -265,39 +230,15 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
-	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> reduce(
-		ReduceFunction<T> reduceFunction,
-		WindowFunction<T, R, K, W> function,
-		TypeInformation<R> resultType) {
-		return reduce(reduceFunction, function, resultType, LegacyWindowOperatorType.NONE);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * <p>Arriving data is incrementally aggregated using the given reducer.
-	 *
-	 * @param reduceFunction The reduce function that is used for incremental aggregation.
-	 * @param function The window function.
-	 * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
-	 *                           the type of the previous operator whose state we inherit.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	private <R> SingleOutputStreamOperator<R> reduce(
 			ReduceFunction<T> reduceFunction,
-			WindowFunction<T, R, K, W> function,
-			LegacyWindowOperatorType legacyWindowOpType) {
+			WindowFunction<T, R, K, W> function) {
 
 		TypeInformation<T> inType = input.getType();
 		TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
-
-		return reduce(reduceFunction, function, resultType, legacyWindowOpType);
+		return reduce(reduceFunction, function, resultType);
 	}
 
 	/**
@@ -310,15 +251,12 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @param resultType Type information for the result type of the window function.
-	 * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
-	 *                           the type of the previous operator whose state we inherit.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	private <R> SingleOutputStreamOperator<R> reduce(
+	public <R> SingleOutputStreamOperator<R> reduce(
 			ReduceFunction<T> reduceFunction,
 			WindowFunction<T, R, K, W> function,
-			TypeInformation<R> resultType,
-			LegacyWindowOperatorType legacyWindowOpType) {
+			TypeInformation<R> resultType) {
 
 		if (reduceFunction instanceof RichFunction) {
 			throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
@@ -374,8 +312,7 @@ public class WindowedStream<T, K, W extends Window> {
 					new InternalSingleValueWindowFunction<>(function),
 					trigger,
 					allowedLateness,
-					lateDataOutputTag,
-					legacyWindowOpType);
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -1183,12 +1120,6 @@ public class WindowedStream<T, K, W extends Window> {
 
 		String udfName = "WindowedStream." + callLocation;
 
-		SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
-
-		LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function);
 		String opName;
 		KeySelector<T, K> keySel = input.getKeySelector();
 
@@ -1231,8 +1162,7 @@ public class WindowedStream<T, K, W extends Window> {
 					function,
 					trigger,
 					allowedLateness,
-					lateDataOutputTag,
-					legacyWindowOpType);
+					lateDataOutputTag);
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -1629,119 +1559,6 @@ public class WindowedStream<T, K, W extends Window> {
 		return reduce(aggregator);
 	}
 
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private LegacyWindowOperatorType getLegacyWindowType(Function function) {
-		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			if (function instanceof ReduceFunction) {
-				return LegacyWindowOperatorType.FAST_AGGREGATING;
-			} else if (function instanceof WindowFunction) {
-				return LegacyWindowOperatorType.FAST_ACCUMULATING;
-			}
-		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			if (function instanceof ReduceFunction) {
-				return LegacyWindowOperatorType.FAST_AGGREGATING;
-			} else if (function instanceof WindowFunction) {
-				return LegacyWindowOperatorType.FAST_ACCUMULATING;
-			}
-		}
-		return LegacyWindowOperatorType.NONE;
-	}
-
-	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-			ReduceFunction<?> function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
-		if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
-			SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSlide();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-			@SuppressWarnings("unchecked")
-			OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-					new AggregatingProcessingTimeWindowOperator<>(
-							reducer, input.getKeySelector(),
-							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-							input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-							windowLength, windowSlide);
-			return input.transform(opName, resultType, op);
-
-		} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
-			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSize();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-			@SuppressWarnings("unchecked")
-			OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-					new AggregatingProcessingTimeWindowOperator<>(
-							reducer,
-							input.getKeySelector(),
-							input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-							input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-							windowLength, windowSlide);
-			return input.transform(opName, resultType, op);
-		}
-
-		return null;
-	}
-
-	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-			InternalWindowFunction<Iterable<T>, R, K, W> function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
-		if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
-			SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSlide();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
-			@SuppressWarnings("unchecked")
-			InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
-					(InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
-
-			OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-					timeWindowFunction, input.getKeySelector(),
-					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-					windowLength, windowSlide);
-			return input.transform(opName, resultType, op);
-		} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
-			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSize();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
-			@SuppressWarnings("unchecked")
-			InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction =
-					(InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function;
-
-			OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-					timeWindowFunction, input.getKeySelector(),
-					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-					windowLength, windowSlide);
-			return input.transform(opName, resultType, op);
-		}
-
-		return null;
-	}
-
 	public StreamExecutionEnvironment getExecutionEnvironment() {
 		return input.getExecutionEnvironment();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 3c4cfbd..fedd791 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -32,7 +32,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -66,7 +65,7 @@ import java.util.TreeMap;
  */
 @Internal
 public class ContinuousFileMonitoringFunction<OUT>
-	extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction, CheckpointedRestoring<Long> {
+	extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction {
 
 	private static final long serialVersionUID = 1L;
 
@@ -375,12 +374,4 @@ public class ContinuousFileMonitoringFunction<OUT>
 			LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime);
 		}
 	}
-
-	@Override
-	public void restoreState(Long state) throws Exception {
-		this.globalModificationTime = state;
-
-		LOG.info("{} (taskIdx={}) restored global modification time from an older Flink version: {}",
-			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), globalModificationTime);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 3a9e8e1..e14cfda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -25,30 +25,23 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -60,15 +53,15 @@ import static org.apache.flink.util.Preconditions.checkState;
  * The operator that reads the {@link TimestampedFileInputSplit splits} received from the preceding
  * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction}
  * which has a parallelism of 1, this operator can have DOP > 1.
- * <p/>
- * As soon as a split descriptor is received, it is put in a queue, and have another
+ *
+ * <p>As soon as a split descriptor is received, it is put in a queue, and have another
  * thread read the actual data of the split. This architecture allows the separation of the
  * reading thread from the one emitting the checkpoint barriers, thus removing any potential
  * back-pressure.
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, CheckpointedRestoringOperator {
+	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -422,83 +415,4 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 				getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState);
 		}
 	}
-
-	// ------------------------------------------------------------------------
-	//  Restoring / Migrating from an older Flink version.
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void restoreState(FSDataInputStream in) throws Exception {
-
-		LOG.info("{} (taskIdx={}) restoring state from an older Flink version.",
-			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
-
-		// this is just to read the byte indicating if we have udf state or not
-		int hasUdfState = in.read();
-
-		Preconditions.checkArgument(hasUdfState == 0);
-
-		final ObjectInputStream ois = new ObjectInputStream(in);
-		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
-
-		// read the split that was being read
-		FileInputSplit currSplit = (FileInputSplit) ois.readObject();
-
-		// read the pending splits list
-		List<FileInputSplit> pendingSplits = new LinkedList<>();
-		int noOfSplits = div.readInt();
-		for (int i = 0; i < noOfSplits; i++) {
-			FileInputSplit split = (FileInputSplit) ois.readObject();
-			pendingSplits.add(split);
-		}
-
-		// read the state of the format
-		Serializable formatState = (Serializable) ois.readObject();
-
-		div.close();
-
-		if (restoredReaderState == null) {
-			restoredReaderState = new ArrayList<>();
-		}
-
-		// we do not know the modification time of the retrieved splits, so we assign them
-		// artificial ones, with the only constraint that they respect the relative order of the
-		// retrieved splits, because modification time is going to be used to sort the splits within
-		// the "pending splits" priority queue.
-
-		long now = getProcessingTimeService().getCurrentProcessingTime();
-		long runningModTime = Math.max(now, noOfSplits + 1);
-
-		TimestampedFileInputSplit currentSplit = createTimestampedFileSplit(currSplit, --runningModTime, formatState);
-		restoredReaderState.add(currentSplit);
-		for (FileInputSplit split : pendingSplits) {
-			TimestampedFileInputSplit timestampedSplit = createTimestampedFileSplit(split, --runningModTime);
-			restoredReaderState.add(timestampedSplit);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} (taskIdx={}) restored {} splits from legacy: {}.",
-					getClass().getSimpleName(),
-					getRuntimeContext().getIndexOfThisSubtask(),
-					restoredReaderState.size(),
-					restoredReaderState);
-			}
-		}
-	}
-
-	private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime) {
-		return createTimestampedFileSplit(split, modificationTime, null);
-	}
-
-	private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime, Serializable state) {
-		TimestampedFileInputSplit timestampedSplit = new TimestampedFileInputSplit(
-			modificationTime, split.getSplitNumber(), split.getPath(),
-			split.getStart(), split.getLength(), split.getHostnames());
-
-		if (state != null) {
-			timestampedSplit.setSplitState(state);
-		}
-		return timestampedSplit;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index abaa74e..884b899 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -116,7 +115,7 @@ public class StreamingJobGraphGenerator {
 	private StreamingJobGraphGenerator(StreamGraph streamGraph) {
 		this.streamGraph = streamGraph;
 		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
-		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
+		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
 
 		this.jobVertices = new HashMap<>();
 		this.builtVertices = new HashSet<>();
@@ -241,14 +240,14 @@ public class StreamingJobGraphGenerator {
 				createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
 			}
 
-			List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.get(startNodeId);
-			if (operatorHashes == null) {
-				operatorHashes = new ArrayList<>();
-				chainedOperatorHashes.put(startNodeId, operatorHashes);
-			}
+			List<Tuple2<byte[], byte[]>> operatorHashes =
+				chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
 
 			byte[] primaryHashBytes = hashes.get(currentNodeId);
-			operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHashes.get(1).get(currentNodeId)));
+
+			for (Map<Integer, byte[]> legacyHash : legacyHashes) {
+				operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
+			}
 
 			chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
 			chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a72b9fe..a28fc30 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
@@ -55,7 +54,6 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -251,42 +249,6 @@ public abstract class AbstractStreamOperator<OUT>
 				getContainingTask().getCancelables()); // access to register streams for canceling
 
 		initializeState(initializationContext);
-
-		if (restoring) {
-
-			// finally restore the legacy state in case we are
-			// migrating from a previous Flink version.
-
-			restoreStreamCheckpointed(stateHandles);
-		}
-	}
-
-	/**
-	 * @deprecated Non-repartitionable operator state that has been deprecated.
-	 * Can be removed when we remove the APIs for non-repartitionable operator state.
-	 */
-	@Deprecated
-	private void restoreStreamCheckpointed(OperatorSubtaskState stateHandles) throws Exception {
-		StreamStateHandle state = stateHandles.getLegacyOperatorState();
-		if (null != state) {
-			if (this instanceof CheckpointedRestoringOperator) {
-
-				LOG.debug("Restore state of task {} in operator with id ({}).",
-					getContainingTask().getName(), getOperatorID());
-
-				FSDataInputStream is = state.openInputStream();
-				try {
-					getContainingTask().getCancelables().registerClosable(is);
-					((CheckpointedRestoringOperator) this).restoreState(is);
-				} finally {
-					getContainingTask().getCancelables().unregisterClosable(is);
-					is.close();
-				}
-			} else {
-				throw new Exception(
-						"Found legacy operator state for operator that does not implement StreamCheckpointedOperator.");
-			}
-		}
 	}
 
 	/**
@@ -451,35 +413,6 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	/**
-	 * @deprecated Non-repartitionable operator state that has been deprecated.
-	 * Can be removed when we remove the APIs for non-repartitionable operator state.
-	 */
-	@SuppressWarnings("deprecation")
-	@Deprecated
-	@Override
-	public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
-		if (this instanceof StreamCheckpointedOperator) {
-			CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
-
-			final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-				factory.createCheckpointStateOutputStream(checkpointId, timestamp);
-
-			getContainingTask().getCancelables().registerClosable(outStream);
-
-			try {
-				((StreamCheckpointedOperator) this).snapshotState(outStream, checkpointId, timestamp);
-				return outStream.closeAndGetHandle();
-			}
-			finally {
-				getContainingTask().getCancelables().unregisterClosable(outStream);
-				outStream.close();
-			}
-		} else {
-			return null;
-		}
-	}
-
-	/**
 	 * Stream operators with state which can be restored need to override this hook method.
 	 *
 	 * @param context context that allows to register different states.