You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/23 14:22:37 UTC

flink git commit: [FLINK-5480] [savepoints] Add setUidHash method to DataStream API

Repository: flink
Updated Branches:
  refs/heads/master f3419af33 -> 0de2bc35b


[FLINK-5480] [savepoints] Add setUidHash method to DataStream API

This closes #3117.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0de2bc35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0de2bc35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0de2bc35

Branch: refs/heads/master
Commit: 0de2bc35b30685f363d40a0b9989271ff230bb9b
Parents: f3419af
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 12 18:57:52 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 23 15:22:19 2017 +0100

----------------------------------------------------------------------
 .../connectors/cassandra/CassandraSink.java     | 32 +++++++++++
 .../api/graph/StreamGraphHasherV1.java          |  4 +-
 .../api/datastream/DataStreamSink.java          | 26 +++++++++
 .../datastream/SingleOutputStreamOperator.java  | 26 +++++++++
 .../flink/streaming/api/graph/StreamGraph.java  | 37 +++++++------
 .../api/graph/StreamGraphGenerator.java         |  5 +-
 .../api/graph/StreamGraphHasherV2.java          |  4 +-
 .../api/graph/StreamGraphUserHashHasher.java    | 57 ++++++++++++++++++++
 .../flink/streaming/api/graph/StreamNode.java   | 19 +++++--
 .../api/graph/StreamingJobGraphGenerator.java   |  8 +--
 .../transformations/StreamTransformation.java   | 44 ++++++++++++++-
 .../StreamingJobGraphGeneratorNodeHashTest.java | 40 ++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 24 +++++++++
 13 files changed, 296 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 180b638..9f0079f 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.core.Cluster;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -85,6 +86,7 @@ public class CassandraSink<IN> {
 	 * @param uid The unique user-specified ID of this transformation.
 	 * @return The operator with the specified ID.
 	 */
+	@PublicEvolving
 	public CassandraSink<IN> uid(String uid) {
 		if (useDataStreamSink) {
 			getSinkTransformation().setUid(uid);
@@ -95,6 +97,36 @@ public class CassandraSink<IN> {
 	}
 
 	/**
+	 * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+	 * <p/>
+	 * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+	 * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+	 * <p/>
+	 * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+	 * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+	 * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+	 *
+	 * <p>
+	 * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+	 * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+	 * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+	 * <p/>
+	 *
+	 * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+	 *                 logs and web ui.
+	 * @return The operator with the user provided hash.
+	 */
+	@PublicEvolving
+	public CassandraSink<IN> setUidHash(String uidHash) {
+		if (useDataStreamSink) {
+			getSinkTransformation().setUidHash(uidHash);
+		} else {
+			getStreamTransformation().setUidHash(uidHash);
+		}
+		return this;
+	}
+
+	/**
 	 * Sets the parallelism for this sink. The degree must be higher than zero.
 	 *
 	 * @param parallelism The parallelism for this sink.

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
index dec0c18..a80ff7d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
@@ -130,7 +130,7 @@ public class StreamGraphHasherV1 implements StreamGraphHasher {
 			boolean isChainingEnabled) {
 
 		// Check for user-specified ID
-		String userSpecifiedHash = node.getTransformationId();
+		String userSpecifiedHash = node.getTransformationUID();
 
 		if (userSpecifiedHash == null) {
 			// Check that all input nodes have their hashes computed
@@ -192,7 +192,7 @@ public class StreamGraphHasherV1 implements StreamGraphHasher {
 	 * Generates a hash from a user-specified ID.
 	 */
 	private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
-		hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
+		hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));
 
 		return hasher.hash().asBytes();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index d3c1ea8..0c9378b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -77,6 +77,32 @@ public class DataStreamSink<T> {
 	}
 
 	/**
+	 * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+	 * <p/>
+	 * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+	 * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+	 * <p/>
+	 * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+	 * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+	 * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+	 *
+	 * <p>
+	 * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+	 * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+	 * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+	 * <p/>
+	 *
+	 * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+	 *                 logs and web ui.
+	 * @return The operator with the user provided hash.
+	 */
+	@PublicEvolving
+	public DataStreamSink<T> setUidHash(String uidHash) {
+		transformation.setUidHash(uidHash);
+		return this;
+	}
+
+	/**
 	 * Sets the parallelism for this sink. The degree must be higher than zero.
 	 *
 	 * @param parallelism The parallelism for this sink.

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 614f19b..3fe21fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -88,6 +88,32 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	}
 
 	/**
+	 * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+	 * <p/>
+	 * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+	 * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+	 * <p/>
+	 * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+	 * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+	 * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+	 *
+	 * <p>
+	 * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+	 * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+	 * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+	 * <p/>
+	 *
+	 * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+	 *                 logs and web ui.
+	 * @return The operator with the user provided hash.
+	 */
+	@PublicEvolving
+	public SingleOutputStreamOperator<T> setUidHash(String uidHash) {
+		transformation.setUidHash(uidHash);
+		return this;
+	}
+
+	/**
 	 * Sets the parallelism for this operator. The degree must be 1 or more.
 	 * 
 	 * @param parallelism

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 29a5ea5..a4a8dc7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -17,18 +17,6 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
@@ -41,6 +29,7 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,7 +38,6 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -63,6 +51,18 @@ import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * Class representing the streaming topology. It contains all the information
  * necessary to build the jobgraph for the execution.
@@ -472,10 +472,17 @@ public class StreamGraph extends StreamingPlan {
 		getStreamNode(vertexID).setInputFormat(inputFormat);
 	}
 
-	void setTransformationId(Integer nodeId, String transformationId) {
+	void setTransformationUID(Integer nodeId, String transformationId) {
+		StreamNode node = streamNodes.get(nodeId);
+		if (node != null) {
+			node.setTransformationUID(transformationId);
+		}
+	}
+
+	void setTransformationUserHash(Integer nodeId, String nodeHash) {
 		StreamNode node = streamNodes.get(nodeId);
 		if (node != null) {
-			node.setTransformationId(transformationId);
+			node.setUserHash(nodeHash);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 506b664..7ab7494 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -208,7 +208,10 @@ public class StreamGraphGenerator {
 			streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
 		}
 		if (transform.getUid() != null) {
-			streamGraph.setTransformationId(transform.getId(), transform.getUid());
+			streamGraph.setTransformationUID(transform.getId(), transform.getUid());
+		}
+		if (transform.getUserProvidedNodeHash() != null) {
+			streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
 		}
 
 		return transformedIds;

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
index 75b606a..d5084ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
@@ -148,7 +148,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 			boolean isChainingEnabled) {
 
 		// Check for user-specified ID
-		String userSpecifiedHash = node.getTransformationId();
+		String userSpecifiedHash = node.getTransformationUID();
 
 		if (userSpecifiedHash == null) {
 			// Check that all input nodes have their hashes computed
@@ -210,7 +210,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 	 * Generates a hash from a user-specified ID.
 	 */
 	private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
-		hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
+		hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));
 
 		return hasher.hash().asBytes();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
new file mode 100644
index 0000000..a5e77cc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * StreamGraphHasher that works with user provided hashes. This is useful in case we want to set (alternative) hashes
+ * explicitly, e.g. to provide a way of manual backwards compatibility between versions when the mechanism of generating
+ * hashes has changed in an incompatible way.
+ *
+ */
+public class StreamGraphUserHashHasher implements StreamGraphHasher {
+
+	@Override
+	public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+		HashMap<Integer, byte[]> hashResult = new HashMap<>();
+		for (StreamNode streamNode : streamGraph.getStreamNodes()) {
+
+			String userHash = streamNode.getUserHash();
+
+			if (null != userHash) {
+				for (StreamEdge inEdge : streamNode.getInEdges()) {
+					if (StreamingJobGraphGenerator.isChainable(inEdge, streamGraph)) {
+						throw new UnsupportedOperationException("Cannot assign user-specified hash "
+								+ "to intermediate node in chain. This will be supported in future "
+								+ "versions of Flink. As a work around start new chain at task "
+								+ streamNode.getOperatorName() + ".");
+					}
+				}
+
+				hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash));
+			}
+		}
+
+		return hashResult;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 7085eeb..0d58ed2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -69,7 +69,8 @@ public class StreamNode implements Serializable {
 
 	private InputFormat<?, ?> inputFormat;
 
-	private String transformationId;
+	private String transformationUID;
+	private String userHash;
 
 	public StreamNode(StreamExecutionEnvironment env,
 		Integer id,
@@ -272,12 +273,20 @@ public class StreamNode implements Serializable {
 		this.stateKeySerializer = stateKeySerializer;
 	}
 
-	public String getTransformationId() {
-		return transformationId;
+	public String getTransformationUID() {
+		return transformationUID;
 	}
 
-	void setTransformationId(String transformationId) {
-		this.transformationId = transformationId;
+	void setTransformationUID(String transformationId) {
+		this.transformationUID = transformationId;
+	}
+
+	public String getUserHash() {
+		return userHash;
+	}
+
+	public void setUserHash(String userHash) {
+		this.userHash = userHash;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index f562b98..e306f30 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -51,8 +51,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -89,7 +89,7 @@ public class StreamingJobGraphGenerator {
 	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
 		this.streamGraph = streamGraph;
 		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
-		this.legacyStreamGraphHashers = Collections.<StreamGraphHasher>singletonList(new StreamGraphHasherV1());
+		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
 	}
 
 	private void init() {
@@ -185,7 +185,7 @@ public class StreamingJobGraphGenerator {
 			List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
 
 			for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
-				if (isChainable(outEdge)) {
+				if (isChainable(outEdge, streamGraph)) {
 					chainableOutputs.add(outEdge);
 				} else {
 					nonChainableOutputs.add(outEdge);
@@ -426,7 +426,7 @@ public class StreamingJobGraphGenerator {
 		}
 	}
 
-	private boolean isChainable(StreamEdge edge) {
+	public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
 		StreamNode upStreamVertex = edge.getSourceVertex();
 		StreamNode downStreamVertex = edge.getTargetVertex();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index e674619..f7aecdb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -99,11 +99,13 @@ public abstract class StreamTransformation<T> {
 
 	// This is used to assign a unique ID to every StreamTransformation
 	protected static Integer idCounter = 0;
+
 	public static int getNewNodeId() {
 		idCounter++;
 		return idCounter;
 	}
 
+
 	protected final int id;
 
 	protected String name;
@@ -130,6 +132,8 @@ public abstract class StreamTransformation<T> {
 	 */
 	private String uid;
 
+	private String userProvidedNodeHash;
+
 	protected long bufferTimeout = -1;
 
 	private String slotSharingGroup;
@@ -205,7 +209,45 @@ public abstract class StreamTransformation<T> {
 	}
 
 	/**
-	 * Sets an ID for this {@link StreamTransformation}.
+	 * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+	 * <p/>
+	 * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+	 * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+	 * <p/>
+	 * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+	 * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+	 * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+	 *
+	 * <p>
+	 * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+	 * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+	 * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+	 * <p/>
+	 *
+	 * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+	 *                 logs and web ui.
+	 */
+	public void setUidHash(String uidHash) {
+
+		Preconditions.checkNotNull(uidHash);
+		Preconditions.checkArgument(uidHash.matches("^[0-9A-Fa-f]{32}$"),
+				"Node hash must be a 32 character String that describes a hex code. Found: " + uidHash);
+
+		this.userProvidedNodeHash = uidHash;
+	}
+
+	/**
+	 * Gets the user provided hash.
+	 *
+	 * @return The user provided hash.
+	 */
+	public String getUserProvidedNodeHash() {
+		return userProvidedNodeHash;
+	}
+
+	/**
+	 * Sets an ID for this {@link StreamTransformation}. This is will later be hashed to a uidHash which is then used to
+	 * create the JobVertexID (that is shown in logs and the web ui).
 	 *
 	 * <p>The specified ID is used to assign the same operator ID across job
 	 * submissions (for example when starting a job from a savepoint).

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 340981b..05aa694 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -32,8 +32,10 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -45,6 +47,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests the {@link StreamNode} hash assignment during translation from {@link StreamGraph} to
@@ -422,6 +425,43 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		env.getStreamGraph().getJobGraph();
 	}
 
+	@Test
+	public void testUserProvidedHashing() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+		List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
+
+		env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
+				.map(new NoOpMapFunction())
+				.filter(new NoOpFilterFunction())
+				.keyBy(new NoOpKeySelector())
+				.reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
+
+		StreamGraph streamGraph = env.getStreamGraph();
+		int idx = 1;
+		for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
+			Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(), userHashes.get(idx));
+			--idx;
+		}
+	}
+
+	@Test
+	public void testUserProvidedHashingOnChainNotSupported() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+		env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+				.map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
+				.filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
+				.keyBy(new NoOpKeySelector())
+				.reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
+
+		try {
+			env.getStreamGraph().getJobGraph();
+			fail();
+		} catch (UnsupportedOperationException ignored) {
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index dbc91bd..ba92f86 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -201,6 +201,30 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
+    * Sets an user provided hash for this operator. This will be used AS IS the create
+    * the JobVertexID.
+    * <p/>
+    * <p>The user provided hash is an alternative to the generated hashes, that is
+    * considered when identifying an operator through the default hash mechanics fails
+    * (e.g. because of changes between Flink versions).
+    * <p/>
+    * <p><strong>Important</strong>: this should be used as a workaround or for trouble
+    * shooting. The provided hash needs to be unique per transformation and job. Otherwise,
+    * job submission will fail. Furthermore, you cannot assign user-specified hash to
+    * intermediate nodes in an operator chain and trying so will let your job fail.
+    *
+    * @param hash the user provided hash for this operator.
+    * @return The operator with the user provided hash.
+    */
+  @PublicEvolving
+  def setUidHash(hash: String) : DataStream[T] = javaStream match {
+    case stream : SingleOutputStreamOperator[T] =>
+      asScalaStream(stream.setUidHash(hash))
+    case _ => throw new UnsupportedOperationException("Only supported for operators.")
+      this
+  }
+
+  /**
    * Turns off chaining for this operator so thread co-location will not be
    * used as an optimization. </p> Chaining can be turned off for the whole
    * job by [[StreamExecutionEnvironment.disableOperatorChaining()]]