You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/23 14:22:37 UTC
flink git commit: [FLINK-5480] [savepoints] Add setUidHash method to
DataStream API
Repository: flink
Updated Branches:
refs/heads/master f3419af33 -> 0de2bc35b
[FLINK-5480] [savepoints] Add setUidHash method to DataStream API
This closes #3117.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0de2bc35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0de2bc35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0de2bc35
Branch: refs/heads/master
Commit: 0de2bc35b30685f363d40a0b9989271ff230bb9b
Parents: f3419af
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 12 18:57:52 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jan 23 15:22:19 2017 +0100
----------------------------------------------------------------------
.../connectors/cassandra/CassandraSink.java | 32 +++++++++++
.../api/graph/StreamGraphHasherV1.java | 4 +-
.../api/datastream/DataStreamSink.java | 26 +++++++++
.../datastream/SingleOutputStreamOperator.java | 26 +++++++++
.../flink/streaming/api/graph/StreamGraph.java | 37 +++++++------
.../api/graph/StreamGraphGenerator.java | 5 +-
.../api/graph/StreamGraphHasherV2.java | 4 +-
.../api/graph/StreamGraphUserHashHasher.java | 57 ++++++++++++++++++++
.../flink/streaming/api/graph/StreamNode.java | 19 +++++--
.../api/graph/StreamingJobGraphGenerator.java | 8 +--
.../transformations/StreamTransformation.java | 44 ++++++++++++++-
.../StreamingJobGraphGeneratorNodeHashTest.java | 40 ++++++++++++++
.../flink/streaming/api/scala/DataStream.scala | 24 +++++++++
13 files changed, 296 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 180b638..9f0079f 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.cassandra;
import com.datastax.driver.core.Cluster;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
@@ -85,6 +86,7 @@ public class CassandraSink<IN> {
* @param uid The unique user-specified ID of this transformation.
* @return The operator with the specified ID.
*/
+ @PublicEvolving
public CassandraSink<IN> uid(String uid) {
if (useDataStreamSink) {
getSinkTransformation().setUid(uid);
@@ -95,6 +97,36 @@ public class CassandraSink<IN> {
}
/**
+ * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+ * <p/>
+ * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+ * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+ * <p/>
+ * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+ * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+ * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+ *
+ * <p>
+ * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+ * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+ * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+ * <p/>
+ *
+ * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+ * logs and web ui.
+ * @return The operator with the user provided hash.
+ */
+ @PublicEvolving
+ public CassandraSink<IN> setUidHash(String uidHash) {
+ if (useDataStreamSink) {
+ getSinkTransformation().setUidHash(uidHash);
+ } else {
+ getStreamTransformation().setUidHash(uidHash);
+ }
+ return this;
+ }
+
+ /**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
* @param parallelism The parallelism for this sink.
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
index dec0c18..a80ff7d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
@@ -130,7 +130,7 @@ public class StreamGraphHasherV1 implements StreamGraphHasher {
boolean isChainingEnabled) {
// Check for user-specified ID
- String userSpecifiedHash = node.getTransformationId();
+ String userSpecifiedHash = node.getTransformationUID();
if (userSpecifiedHash == null) {
// Check that all input nodes have their hashes computed
@@ -192,7 +192,7 @@ public class StreamGraphHasherV1 implements StreamGraphHasher {
* Generates a hash from a user-specified ID.
*/
private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
- hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
+ hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));
return hasher.hash().asBytes();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index d3c1ea8..0c9378b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -77,6 +77,32 @@ public class DataStreamSink<T> {
}
/**
+ * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+ * <p/>
+ * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+ * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+ * <p/>
+ * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+ * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+ * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+ *
+ * <p>
+ * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+ * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+ * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+ * <p/>
+ *
+ * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+ * logs and web ui.
+ * @return The operator with the user provided hash.
+ */
+ @PublicEvolving
+ public DataStreamSink<T> setUidHash(String uidHash) {
+ transformation.setUidHash(uidHash);
+ return this;
+ }
+
+ /**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
* @param parallelism The parallelism for this sink.
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 614f19b..3fe21fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -88,6 +88,32 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
}
/**
+ * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+ * <p/>
+ * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+ * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+ * <p/>
+ * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+ * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+ * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+ *
+ * <p>
+ * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+ * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+ * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+ * <p/>
+ *
+ * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+ * logs and web ui.
+ * @return The operator with the user provided hash.
+ */
+ @PublicEvolving
+ public SingleOutputStreamOperator<T> setUidHash(String uidHash) {
+ transformation.setUidHash(uidHash);
+ return this;
+ }
+
+ /**
* Sets the parallelism for this operator. The degree must be 1 or more.
*
* @param parallelism
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 29a5ea5..a4a8dc7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -17,18 +17,6 @@
package org.apache.flink.streaming.api.graph;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
@@ -41,6 +29,7 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,7 +38,6 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -63,6 +51,18 @@ import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
@@ -472,10 +472,17 @@ public class StreamGraph extends StreamingPlan {
getStreamNode(vertexID).setInputFormat(inputFormat);
}
- void setTransformationId(Integer nodeId, String transformationId) {
+ void setTransformationUID(Integer nodeId, String transformationId) {
+ StreamNode node = streamNodes.get(nodeId);
+ if (node != null) {
+ node.setTransformationUID(transformationId);
+ }
+ }
+
+ void setTransformationUserHash(Integer nodeId, String nodeHash) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
- node.setTransformationId(transformationId);
+ node.setUserHash(nodeHash);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 506b664..7ab7494 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -208,7 +208,10 @@ public class StreamGraphGenerator {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
- streamGraph.setTransformationId(transform.getId(), transform.getUid());
+ streamGraph.setTransformationUID(transform.getId(), transform.getUid());
+ }
+ if (transform.getUserProvidedNodeHash() != null) {
+ streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
return transformedIds;
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
index 75b606a..d5084ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
@@ -148,7 +148,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
boolean isChainingEnabled) {
// Check for user-specified ID
- String userSpecifiedHash = node.getTransformationId();
+ String userSpecifiedHash = node.getTransformationUID();
if (userSpecifiedHash == null) {
// Check that all input nodes have their hashes computed
@@ -210,7 +210,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
* Generates a hash from a user-specified ID.
*/
private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
- hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
+ hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));
return hasher.hash().asBytes();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
new file mode 100644
index 0000000..a5e77cc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * StreamGraphHasher that works with user provided hashes. This is useful in case we want to set (alternative) hashes
+ * explicitly, e.g. to provide a way of manual backwards compatibility between versions when the mechanism of generating
+ * hashes has changed in an incompatible way.
+ *
+ */
+public class StreamGraphUserHashHasher implements StreamGraphHasher {
+
+ @Override
+ public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+ HashMap<Integer, byte[]> hashResult = new HashMap<>();
+ for (StreamNode streamNode : streamGraph.getStreamNodes()) {
+
+ String userHash = streamNode.getUserHash();
+
+ if (null != userHash) {
+ for (StreamEdge inEdge : streamNode.getInEdges()) {
+ if (StreamingJobGraphGenerator.isChainable(inEdge, streamGraph)) {
+ throw new UnsupportedOperationException("Cannot assign user-specified hash "
+ + "to intermediate node in chain. This will be supported in future "
+ + "versions of Flink. As a work around start new chain at task "
+ + streamNode.getOperatorName() + ".");
+ }
+ }
+
+ hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash));
+ }
+ }
+
+ return hashResult;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 7085eeb..0d58ed2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -69,7 +69,8 @@ public class StreamNode implements Serializable {
private InputFormat<?, ?> inputFormat;
- private String transformationId;
+ private String transformationUID;
+ private String userHash;
public StreamNode(StreamExecutionEnvironment env,
Integer id,
@@ -272,12 +273,20 @@ public class StreamNode implements Serializable {
this.stateKeySerializer = stateKeySerializer;
}
- public String getTransformationId() {
- return transformationId;
+ public String getTransformationUID() {
+ return transformationUID;
}
- void setTransformationId(String transformationId) {
- this.transformationId = transformationId;
+ void setTransformationUID(String transformationId) {
+ this.transformationUID = transformationId;
+ }
+
+ public String getUserHash() {
+ return userHash;
+ }
+
+ public void setUserHash(String userHash) {
+ this.userHash = userHash;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index f562b98..e306f30 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -51,8 +51,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -89,7 +89,7 @@ public class StreamingJobGraphGenerator {
public StreamingJobGraphGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
- this.legacyStreamGraphHashers = Collections.<StreamGraphHasher>singletonList(new StreamGraphHasherV1());
+ this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
}
private void init() {
@@ -185,7 +185,7 @@ public class StreamingJobGraphGenerator {
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
- if (isChainable(outEdge)) {
+ if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
@@ -426,7 +426,7 @@ public class StreamingJobGraphGenerator {
}
}
- private boolean isChainable(StreamEdge edge) {
+ public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index e674619..f7aecdb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -99,11 +99,13 @@ public abstract class StreamTransformation<T> {
// This is used to assign a unique ID to every StreamTransformation
protected static Integer idCounter = 0;
+
public static int getNewNodeId() {
idCounter++;
return idCounter;
}
+
protected final int id;
protected String name;
@@ -130,6 +132,8 @@ public abstract class StreamTransformation<T> {
*/
private String uid;
+ private String userProvidedNodeHash;
+
protected long bufferTimeout = -1;
private String slotSharingGroup;
@@ -205,7 +209,45 @@ public abstract class StreamTransformation<T> {
}
/**
- * Sets an ID for this {@link StreamTransformation}.
+ * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
+ * <p/>
+ * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
+ * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
+ * <p/>
+ * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
+ * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
+ * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
+ *
+ * <p>
+ * A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
+ * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
+ * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
+ * <p/>
+ *
+ * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
+ * logs and web ui.
+ */
+ public void setUidHash(String uidHash) {
+
+ Preconditions.checkNotNull(uidHash);
+ Preconditions.checkArgument(uidHash.matches("^[0-9A-Fa-f]{32}$"),
+ "Node hash must be a 32 character String that describes a hex code. Found: " + uidHash);
+
+ this.userProvidedNodeHash = uidHash;
+ }
+
+ /**
+ * Gets the user provided hash.
+ *
+ * @return The user provided hash.
+ */
+ public String getUserProvidedNodeHash() {
+ return userProvidedNodeHash;
+ }
+
+ /**
+ * Sets an ID for this {@link StreamTransformation}. This is will later be hashed to a uidHash which is then used to
+ * create the JobVertexID (that is shown in logs and the web ui).
*
* <p>The specified ID is used to assign the same operator ID across job
* submissions (for example when starting a job from a savepoint).
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 340981b..05aa694 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -32,8 +32,10 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -45,6 +47,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests the {@link StreamNode} hash assignment during translation from {@link StreamGraph} to
@@ -422,6 +425,43 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
env.getStreamGraph().getJobGraph();
}
+ @Test
+ public void testUserProvidedHashing() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+ List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
+
+ env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
+ .map(new NoOpMapFunction())
+ .filter(new NoOpFilterFunction())
+ .keyBy(new NoOpKeySelector())
+ .reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ int idx = 1;
+ for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
+ Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(), userHashes.get(idx));
+ --idx;
+ }
+ }
+
+ @Test
+ public void testUserProvidedHashingOnChainNotSupported() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+ env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+ .map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
+ .filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
+ .keyBy(new NoOpKeySelector())
+ .reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
+
+ try {
+ env.getStreamGraph().getJobGraph();
+ fail();
+ } catch (UnsupportedOperationException ignored) {
+ }
+ }
+
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0de2bc35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index dbc91bd..ba92f86 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -201,6 +201,30 @@ class DataStream[T](stream: JavaStream[T]) {
}
/**
+ * Sets an user provided hash for this operator. This will be used AS IS the create
+ * the JobVertexID.
+ * <p/>
+ * <p>The user provided hash is an alternative to the generated hashes, that is
+ * considered when identifying an operator through the default hash mechanics fails
+ * (e.g. because of changes between Flink versions).
+ * <p/>
+ * <p><strong>Important</strong>: this should be used as a workaround or for trouble
+ * shooting. The provided hash needs to be unique per transformation and job. Otherwise,
+ * job submission will fail. Furthermore, you cannot assign user-specified hash to
+ * intermediate nodes in an operator chain and trying so will let your job fail.
+ *
+ * @param hash the user provided hash for this operator.
+ * @return The operator with the user provided hash.
+ */
+ @PublicEvolving
+ def setUidHash(hash: String) : DataStream[T] = javaStream match {
+ case stream : SingleOutputStreamOperator[T] =>
+ asScalaStream(stream.setUidHash(hash))
+ case _ => throw new UnsupportedOperationException("Only supported for operators.")
+ this
+ }
+
+ /**
* Turns off chaining for this operator so thread co-location will not be
* used as an optimization. </p> Chaining can be turned off for the whole
* job by [[StreamExecutionEnvironment.disableOperatorChaining()]]