You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 11:20:30 UTC
flink git commit: [FLINK-5290] Ensure backwards compatibility of the
hashes used to generate JobVertexIds
Repository: flink
Updated Branches:
refs/heads/master 8a7288ea9 -> 6cfc841b5
[FLINK-5290] Ensure backwards compatibility of the hashes used to generate JobVertexIds
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cfc841b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cfc841b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cfc841b
Branch: refs/heads/master
Commit: 6cfc841b5f5829593cf2993bf3493f9ec42657e6
Parents: 8a7288e
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Dec 8 14:03:47 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 10:53:57 2016 +0100
----------------------------------------------------------------------
.../checkpoint/StateAssignmentOperation.java | 14 +-
.../checkpoint/savepoint/SavepointLoader.java | 12 +
.../executiongraph/ExecutionJobVertex.java | 25 ++
.../runtime/jobgraph/InputFormatVertex.java | 6 +
.../flink/runtime/jobgraph/JobVertex.java | 24 ++
.../executiongraph/LegacyJobVertexIdTest.java | 77 +++++
.../api/graph/StreamGraphHasherV1.java | 293 +++++++++++++++++
.../streaming/api/graph/StreamGraphHasher.java | 36 +++
.../api/graph/StreamGraphHasherV2.java | 315 +++++++++++++++++++
.../flink/streaming/api/graph/StreamNode.java | 10 +-
.../api/graph/StreamingJobGraphGenerator.java | 302 +++---------------
11 files changed, 841 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index f496a07..61a71e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -62,10 +62,22 @@ public class StateAssignmentOperation {
public boolean assignStates() throws Exception {
+ boolean expandedToLegacyIds = false;
+ Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
+
for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
TaskState taskState = taskGroupStateEntry.getValue();
- ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey());
+ ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
+
+ // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
+ // for example as generated from older flink versions, to provide backwards compatibility.
+ if (executionJobVertex == null && !expandedToLegacyIds) {
+ localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
+ executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
+ expandedToLegacyIds = true;
+ logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search.");
+ }
if (executionJobVertex == null) {
if (allowNonRestoredState) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 172e425..d6be482 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -64,10 +64,22 @@ public class SavepointLoader {
Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath, userClassLoader);
final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size());
+ boolean expandedToLegacyIds = false;
+
// (2) validate it (parallelism, etc)
for (TaskState taskState : savepoint.getTaskStates()) {
+
ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID());
+ // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
+ // for example as generated from older flink versions, to provide backwards compatibility.
+ if (executionJobVertex == null && !expandedToLegacyIds) {
+ tasks = ExecutionJobVertex.includeLegacyJobVertexIDs(tasks);
+ executionJobVertex = tasks.get(taskState.getJobVertexID());
+ expandedToLegacyIds = true;
+ LOG.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search.");
+ }
+
if (executionJobVertex != null) {
if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()) {
taskStates.put(taskState.getJobVertexID(), taskState);
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index c066ca8..7f2545c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -617,6 +617,31 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
}
+ public static Map<JobVertexID, ExecutionJobVertex> includeLegacyJobVertexIDs(
+ Map<JobVertexID, ExecutionJobVertex> tasks) {
+
+ Map<JobVertexID, ExecutionJobVertex> expanded = new HashMap<>(2 * tasks.size());
+ // first include all new ids
+ expanded.putAll(tasks);
+
+ // now expand and add legacy ids
+ for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+ if (null != executionJobVertex) {
+ JobVertex jobVertex = executionJobVertex.getJobVertex();
+ if (null != jobVertex) {
+ List<JobVertexID> alternativeIds = jobVertex.getIdAlternatives();
+ for (JobVertexID jobVertexID : alternativeIds) {
+ ExecutionJobVertex old = expanded.put(jobVertexID, executionJobVertex);
+ Preconditions.checkState(null == old || old.equals(executionJobVertex),
+ "Ambiguous jobvertex id detected during expansion to legacy ids.");
+ }
+ }
+ }
+ }
+
+ return expanded;
+ }
+
@Override
public ArchivedExecutionJobVertex archive() {
return new ArchivedExecutionJobVertex(this);
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 781108c..c4fc907 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import java.util.List;
+
public class InputFormatVertex extends JobVertex {
private static final long serialVersionUID = 1L;
@@ -36,6 +38,10 @@ public class InputFormatVertex extends JobVertex {
public InputFormatVertex(String name, JobVertexID id) {
super(name, id);
}
+
+ public InputFormatVertex(String name, JobVertexID id, List<JobVertexID> alternativeIds) {
+ super(name, id, alternativeIds);
+ }
public void setFormatDescription(String formatDescription) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 2bda9d8..d24100e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -48,6 +48,8 @@ public class JobVertex implements java.io.Serializable {
/** The ID of the vertex. */
private final JobVertexID id;
+ private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
+
/** List of produced data sets, one per writer */
private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
@@ -117,6 +119,19 @@ public class JobVertex implements java.io.Serializable {
this.id = id == null ? new JobVertexID() : id;
}
+ /**
+ * Constructs a new job vertex and assigns it with the given name.
+ *
+ * @param name The name of the new job vertex.
+ * @param primaryId The id of the job vertex.
+ * @param alternativeIds The alternative ids of the job vertex.
+ */
+ public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds) {
+ this.name = name == null ? DEFAULT_NAME : name;
+ this.id = primaryId == null ? new JobVertexID() : primaryId;
+ this.idAlternatives.addAll(alternativeIds);
+ }
+
// --------------------------------------------------------------------------------------------
/**
@@ -129,6 +144,15 @@ public class JobVertex implements java.io.Serializable {
}
/**
+ * Returns a list of all alternative IDs of this job vertex.
+ *
+ * @return List of all alternative IDs for this job vertex
+ */
+ public List<JobVertexID> getIdAlternatives() {
+ return idAlternatives;
+ }
+
+ /**
* Returns the name of the vertex.
*
* @return The name of the vertex.
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
new file mode 100644
index 0000000..44dc0a4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.mockito.Mockito.mock;
+
+public class LegacyJobVertexIdTest {
+
+ @Test
+ public void testIntroduceLegacyJobVertexIds() throws Exception {
+ JobVertexID defaultId = new JobVertexID();
+ JobVertexID legacyId1 = new JobVertexID();
+ JobVertexID legacyId2 = new JobVertexID();
+
+ JobVertex jobVertex = new JobVertex("test", defaultId, Arrays.asList(legacyId1, legacyId2));
+ jobVertex.setInvokableClass(AbstractInvokable.class);
+
+ ExecutionGraph executionGraph = new ExecutionGraph(
+ mock(Executor.class),
+ mock(Executor.class),
+ new JobID(),
+ "test",
+ mock(Configuration.class),
+ mock(SerializedValue.class),
+ Time.seconds(1),
+ mock(RestartStrategy.class));
+
+ ExecutionJobVertex executionJobVertex =
+ new ExecutionJobVertex(executionGraph, jobVertex, 1, Time.seconds(1));
+
+ Map<JobVertexID, ExecutionJobVertex> idToVertex = new HashMap<>();
+ idToVertex.put(executionJobVertex.getJobVertexId(), executionJobVertex);
+
+ Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId));
+ Assert.assertNull(idToVertex.get(legacyId1));
+ Assert.assertNull(idToVertex.get(legacyId2));
+
+ idToVertex = ExecutionJobVertex.includeLegacyJobVertexIDs(idToVertex);
+
+ Assert.assertEquals(3, idToVertex.size());
+ Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId));
+ Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId1));
+ Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId2));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
new file mode 100644
index 0000000..dec0c18
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphHasher;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/**
+ * StreamGraphHasher from Flink 1.1. This contains duplicated code to ensure that the algorithm does not change with
+ * future Flink versions.
+ *
+ * <p>DO NOT MODIFY THIS CLASS
+ */
+public class StreamGraphHasherV1 implements StreamGraphHasher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV1.class);
+
+ @Override
+ public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+ // The hash function used to generate the hash
+ final HashFunction hashFunction = Hashing.murmur3_128(0);
+ final Map<Integer, byte[]> hashes = new HashMap<>();
+
+ Set<Integer> visited = new HashSet<>();
+ Queue<StreamNode> remaining = new ArrayDeque<>();
+
+ // We need to make the source order deterministic. The source IDs are
+ // not returned in the same order, which means that submitting the same
+ // program twice might result in different traversal, which breaks the
+ // deterministic hash assignment.
+ List<Integer> sources = new ArrayList<>();
+ for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+ sources.add(sourceNodeId);
+ }
+ Collections.sort(sources);
+
+ //
+ // Traverse the graph in a breadth-first manner. Keep in mind that
+ // the graph is not a tree and multiple paths to nodes can exist.
+ //
+
+ // Start with source nodes
+ for (Integer sourceNodeId : sources) {
+ remaining.add(streamGraph.getStreamNode(sourceNodeId));
+ visited.add(sourceNodeId);
+ }
+
+ StreamNode currentNode;
+ while ((currentNode = remaining.poll()) != null) {
+ // Generate the hash code. Because multiple path exist to each
+ // node, we might not have all required inputs available to
+ // generate the hash code.
+ if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
+ // Add the child nodes
+ for (StreamEdge outEdge : currentNode.getOutEdges()) {
+ StreamNode child = outEdge.getTargetVertex();
+
+ if (!visited.contains(child.getId())) {
+ remaining.add(child);
+ visited.add(child.getId());
+ }
+ }
+ } else {
+ // We will revisit this later.
+ visited.remove(currentNode.getId());
+ }
+ }
+
+ return hashes;
+ }
+
+ /**
+ * Generates a hash for the node and returns whether the operation was
+ * successful.
+ *
+ * @param node The node to generate the hash for
+ * @param hashFunction The hash function to use
+ * @param hashes The current state of generated hashes
+ * @return <code>true</code> if the node hash has been generated.
+ * <code>false</code>, otherwise. If the operation is not successful, the
+ * hash needs be generated at a later point when all input is available.
+ * @throws IllegalStateException If node has user-specified hash and is
+ * intermediate node of a chain
+ */
+ private boolean generateNodeHash(
+ StreamNode node,
+ HashFunction hashFunction,
+ Map<Integer, byte[]> hashes,
+ boolean isChainingEnabled) {
+
+ // Check for user-specified ID
+ String userSpecifiedHash = node.getTransformationId();
+
+ if (userSpecifiedHash == null) {
+ // Check that all input nodes have their hashes computed
+ for (StreamEdge inEdge : node.getInEdges()) {
+ // If the input node has not been visited yet, the current
+ // node will be visited again at a later point when all input
+ // nodes have been visited and their hashes set.
+ if (!hashes.containsKey(inEdge.getSourceId())) {
+ return false;
+ }
+ }
+
+ Hasher hasher = hashFunction.newHasher();
+ byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
+
+ if (hashes.put(node.getId(), hash) != null) {
+ // Sanity check
+ throw new IllegalStateException("Unexpected state. Tried to add node hash " +
+ "twice. This is probably a bug in the JobGraph generator.");
+ }
+
+ return true;
+ } else {
+ // Check that this node is not part of a chain. This is currently
+ // not supported, because the runtime takes the snapshots by the
+ // operator ID of the first vertex in a chain. It's OK if the node
+ // has chained outputs.
+ for (StreamEdge inEdge : node.getInEdges()) {
+ if (isChainable(inEdge, isChainingEnabled)) {
+ throw new UnsupportedOperationException("Cannot assign user-specified hash "
+ + "to intermediate node in chain. This will be supported in future "
+ + "versions of Flink. As a work around start new chain at task "
+ + node.getOperatorName() + ".");
+ }
+ }
+
+ Hasher hasher = hashFunction.newHasher();
+ byte[] hash = generateUserSpecifiedHash(node, hasher);
+
+ for (byte[] previousHash : hashes.values()) {
+ if (Arrays.equals(previousHash, hash)) {
+ throw new IllegalArgumentException("Hash collision on user-specified ID. " +
+ "Most likely cause is a non-unique ID. Please check that all IDs " +
+ "specified via `uid(String)` are unique.");
+ }
+ }
+
+ if (hashes.put(node.getId(), hash) != null) {
+ // Sanity check
+ throw new IllegalStateException("Unexpected state. Tried to add node hash " +
+ "twice. This is probably a bug in the JobGraph generator.");
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Generates a hash from a user-specified ID.
+ */
+ private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
+ hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
+
+ return hasher.hash().asBytes();
+ }
+
+ /**
+ * Generates a deterministic hash from node-local properties and input and
+ * output edges.
+ */
+ private byte[] generateDeterministicHash(
+ StreamNode node,
+ Hasher hasher,
+ Map<Integer, byte[]> hashes,
+ boolean isChainingEnabled) {
+
+ // Include stream node to hash. We use the current size of the computed
+ // hashes as the ID. We cannot use the node's ID, because it is
+ // assigned from a static counter. This will result in two identical
+ // programs having different hashes.
+ generateNodeLocalHash(node, hasher, hashes.size());
+
+ // Include chained nodes to hash
+ for (StreamEdge outEdge : node.getOutEdges()) {
+ if (isChainable(outEdge, isChainingEnabled)) {
+ StreamNode chainedNode = outEdge.getTargetVertex();
+
+ // Use the hash size again, because the nodes are chained to
+ // this node. This does not add a hash for the chained nodes.
+ generateNodeLocalHash(chainedNode, hasher, hashes.size());
+ }
+ }
+
+ byte[] hash = hasher.hash().asBytes();
+
+ // Make sure that all input nodes have their hash set before entering
+ // this loop (calling this method).
+ for (StreamEdge inEdge : node.getInEdges()) {
+ byte[] otherHash = hashes.get(inEdge.getSourceId());
+
+ // Sanity check
+ if (otherHash == null) {
+ throw new IllegalStateException("Missing hash for input node "
+ + inEdge.getSourceVertex() + ". Cannot generate hash for "
+ + node + ".");
+ }
+
+ for (int j = 0; j < hash.length; j++) {
+ hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ String udfClassName = "";
+ if (node.getOperator() instanceof AbstractUdfStreamOperator) {
+ udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
+ .getUserFunction().getClass().getName();
+ }
+
+ LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
+ "'" + node.toString() + "' {id: " + node.getId() + ", " +
+ "parallelism: " + node.getParallelism() + ", " +
+ "user function: " + udfClassName + "}");
+ }
+
+ return hash;
+ }
+
+ private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
+ StreamNode upStreamVertex = edge.getSourceVertex();
+ StreamNode downStreamVertex = edge.getTargetVertex();
+
+ StreamOperator<?> headOperator = upStreamVertex.getOperator();
+ StreamOperator<?> outOperator = downStreamVertex.getOperator();
+
+ return downStreamVertex.getInEdges().size() == 1
+ && outOperator != null
+ && headOperator != null
+ && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
+ && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
+ && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
+ headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
+ && (edge.getPartitioner() instanceof ForwardPartitioner)
+ && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
+ && isChainingEnabled;
+ }
+
+ private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
+ hasher.putInt(id);
+
+ hasher.putInt(node.getParallelism());
+
+ if (node.getOperator() instanceof AbstractUdfStreamOperator) {
+ String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
+ .getUserFunction().getClass().getName();
+
+ hasher.putString(udfClassName, Charset.forName("UTF-8"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java
new file mode 100644
index 0000000..866fd1f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Map;
+
+/**
+ * Interface for different implementations of generating hashes over a stream graph.
+ */
+public interface StreamGraphHasher {
+
+ /**
+ * Returns a map with a hash for each {@link StreamNode} of the {@link
+ * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+ * identify nodes across job submissions if they didn't change.
+ */
+ Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
new file mode 100644
index 0000000..75b606a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/**
+ * StreamGraphHasher from Flink 1.2. This contains duplicated code to ensure that the algorithm does not change with
+ * future Flink versions.
+ *
+ * <p>DO NOT MODIFY THIS CLASS
+ */
+public class StreamGraphHasherV2 implements StreamGraphHasher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV2.class);
+
+ /**
+ * Returns a map with a hash for each {@link StreamNode} of the {@link
+ * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
+ * identify nodes across job submissions if they didn't change.
+ *
+ * <p>
+ * <p>The complete {@link StreamGraph} is traversed. The hash is either
+ * computed from the transformation's user-specified id (see
+ * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
+ *
+ * <p>
+ * <p>The generated hash is deterministic with respect to:
+ * <ul>
+ * <li>node-local properties (like parallelism, UDF, node ID),
+ * <li>chained output nodes, and
+ * <li>input nodes hashes
+ * </ul>
+ *
+ * @return A map from {@link StreamNode#id} to hash as 16-byte array.
+ */
+ @Override
+ public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+ // The hash function used to generate the hash
+ final HashFunction hashFunction = Hashing.murmur3_128(0);
+ final Map<Integer, byte[]> hashes = new HashMap<>();
+
+ Set<Integer> visited = new HashSet<>();
+ Queue<StreamNode> remaining = new ArrayDeque<>();
+
+ // We need to make the source order deterministic. The source IDs are
+ // not returned in the same order, which means that submitting the same
+ // program twice might result in different traversal, which breaks the
+ // deterministic hash assignment.
+ List<Integer> sources = new ArrayList<>();
+ for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+ sources.add(sourceNodeId);
+ }
+ Collections.sort(sources);
+
+ //
+ // Traverse the graph in a breadth-first manner. Keep in mind that
+ // the graph is not a tree and multiple paths to nodes can exist.
+ //
+
+ // Start with source nodes
+ for (Integer sourceNodeId : sources) {
+ remaining.add(streamGraph.getStreamNode(sourceNodeId));
+ visited.add(sourceNodeId);
+ }
+
+ StreamNode currentNode;
+ while ((currentNode = remaining.poll()) != null) {
+ // Generate the hash code. Because multiple path exist to each
+ // node, we might not have all required inputs available to
+ // generate the hash code.
+ if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
+ // Add the child nodes
+ for (StreamEdge outEdge : currentNode.getOutEdges()) {
+ StreamNode child = outEdge.getTargetVertex();
+
+ if (!visited.contains(child.getId())) {
+ remaining.add(child);
+ visited.add(child.getId());
+ }
+ }
+ } else {
+ // We will revisit this later.
+ visited.remove(currentNode.getId());
+ }
+ }
+
+ return hashes;
+ }
+
+ /**
+ * Generates a hash for the node and returns whether the operation was
+ * successful.
+ *
+ * @param node The node to generate the hash for
+ * @param hashFunction The hash function to use
+ * @param hashes The current state of generated hashes
+ * @return <code>true</code> if the node hash has been generated.
+ * <code>false</code>, otherwise. If the operation is not successful, the
+ * hash needs be generated at a later point when all input is available.
+ * @throws IllegalStateException If node has user-specified hash and is
+ * intermediate node of a chain
+ */
+ private boolean generateNodeHash(
+ StreamNode node,
+ HashFunction hashFunction,
+ Map<Integer, byte[]> hashes,
+ boolean isChainingEnabled) {
+
+ // Check for user-specified ID
+ String userSpecifiedHash = node.getTransformationId();
+
+ if (userSpecifiedHash == null) {
+ // Check that all input nodes have their hashes computed
+ for (StreamEdge inEdge : node.getInEdges()) {
+ // If the input node has not been visited yet, the current
+ // node will be visited again at a later point when all input
+ // nodes have been visited and their hashes set.
+ if (!hashes.containsKey(inEdge.getSourceId())) {
+ return false;
+ }
+ }
+
+ Hasher hasher = hashFunction.newHasher();
+ byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
+
+ if (hashes.put(node.getId(), hash) != null) {
+ // Sanity check
+ throw new IllegalStateException("Unexpected state. Tried to add node hash " +
+ "twice. This is probably a bug in the JobGraph generator.");
+ }
+
+ return true;
+ } else {
+ // Check that this node is not part of a chain. This is currently
+ // not supported, because the runtime takes the snapshots by the
+ // operator ID of the first vertex in a chain. It's OK if the node
+ // has chained outputs.
+ for (StreamEdge inEdge : node.getInEdges()) {
+ if (isChainable(inEdge, isChainingEnabled)) {
+ throw new UnsupportedOperationException("Cannot assign user-specified hash "
+ + "to intermediate node in chain. This will be supported in future "
+ + "versions of Flink. As a work around start new chain at task "
+ + node.getOperatorName() + ".");
+ }
+ }
+
+ Hasher hasher = hashFunction.newHasher();
+ byte[] hash = generateUserSpecifiedHash(node, hasher);
+
+ for (byte[] previousHash : hashes.values()) {
+ if (Arrays.equals(previousHash, hash)) {
+ throw new IllegalArgumentException("Hash collision on user-specified ID. " +
+ "Most likely cause is a non-unique ID. Please check that all IDs " +
+ "specified via `uid(String)` are unique.");
+ }
+ }
+
+ if (hashes.put(node.getId(), hash) != null) {
+ // Sanity check
+ throw new IllegalStateException("Unexpected state. Tried to add node hash " +
+ "twice. This is probably a bug in the JobGraph generator.");
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Generates a hash from a user-specified ID.
+ */
+ private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
+ hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
+
+ return hasher.hash().asBytes();
+ }
+
+ /**
+ * Generates a deterministic hash from node-local properties and input and
+ * output edges.
+ */
+ private byte[] generateDeterministicHash(
+ StreamNode node,
+ Hasher hasher,
+ Map<Integer, byte[]> hashes,
+ boolean isChainingEnabled) {
+
+ // Include stream node to hash. We use the current size of the computed
+ // hashes as the ID. We cannot use the node's ID, because it is
+ // assigned from a static counter. This will result in two identical
+ // programs having different hashes.
+ generateNodeLocalHash(node, hasher, hashes.size());
+
+ // Include chained nodes to hash
+ for (StreamEdge outEdge : node.getOutEdges()) {
+ if (isChainable(outEdge, isChainingEnabled)) {
+ StreamNode chainedNode = outEdge.getTargetVertex();
+
+ // Use the hash size again, because the nodes are chained to
+ // this node. This does not add a hash for the chained nodes.
+ generateNodeLocalHash(chainedNode, hasher, hashes.size());
+ }
+ }
+
+ byte[] hash = hasher.hash().asBytes();
+
+ // Make sure that all input nodes have their hash set before entering
+ // this loop (calling this method).
+ for (StreamEdge inEdge : node.getInEdges()) {
+ byte[] otherHash = hashes.get(inEdge.getSourceId());
+
+ // Sanity check
+ if (otherHash == null) {
+ throw new IllegalStateException("Missing hash for input node "
+ + inEdge.getSourceVertex() + ". Cannot generate hash for "
+ + node + ".");
+ }
+
+ for (int j = 0; j < hash.length; j++) {
+ hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ String udfClassName = "";
+ if (node.getOperator() instanceof AbstractUdfStreamOperator) {
+ udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
+ .getUserFunction().getClass().getName();
+ }
+
+ LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
+ "'" + node.toString() + "' {id: " + node.getId() + ", " +
+ "parallelism: " + node.getParallelism() + ", " +
+ "user function: " + udfClassName + "}");
+ }
+
+ return hash;
+ }
+
+ /**
+ * Applies the {@link Hasher} to the {@link StreamNode} (only node local
+ * attributes are taken into account). The hasher encapsulates the current
+ * state of the hash.
+ *
+ * <p>
+ * <p>The specified ID is local to this node. We cannot use the
+ * {@link StreamNode#id}, because it is incremented in a static counter.
+ * Therefore, the IDs for identical jobs will otherwise be different.
+ */
+ private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
+ // This resolves conflicts for otherwise identical source nodes. BUT
+ // the generated hash codes depend on the ordering of the nodes in the
+ // stream graph.
+ hasher.putInt(id);
+ }
+
+ private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
+ StreamNode upStreamVertex = edge.getSourceVertex();
+ StreamNode downStreamVertex = edge.getTargetVertex();
+
+ StreamOperator<?> headOperator = upStreamVertex.getOperator();
+ StreamOperator<?> outOperator = downStreamVertex.getOperator();
+
+ return downStreamVertex.getInEdges().size() == 1
+ && outOperator != null
+ && headOperator != null
+ && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
+ && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
+ && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
+ headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
+ && (edge.getPartitioner() instanceof ForwardPartitioner)
+ && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
+ && isChainingEnabled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 9051891..7085eeb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -17,10 +17,6 @@
package org.apache.flink.streaming.api.graph;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
@@ -32,6 +28,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Class representing the operators in the streaming programs, with all their properties.
*/
@@ -272,7 +272,7 @@ public class StreamNode implements Serializable {
this.stateKeySerializer = stateKeySerializer;
}
- String getTransformationId() {
+ public String getTransformationId() {
return transformationId;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6cfc841b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 48be2e9..da69b49 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -17,15 +17,13 @@
package org.apache.flink.streaming.api.graph;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
@@ -42,10 +40,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -54,10 +50,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -65,10 +58,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-
-import static org.apache.flink.util.StringUtils.byteToHexString;
@Internal
public class StreamingJobGraphGenerator {
@@ -94,8 +83,13 @@ public class StreamingJobGraphGenerator {
private Map<Integer, StreamConfig> vertexConfigs;
private Map<Integer, String> chainedNames;
+ private final StreamGraphHasher defaultStreamGraphHasher;
+ private final List<StreamGraphHasher> legacyStreamGraphHashers;
+
public StreamingJobGraphGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
+ this.defaultStreamGraphHasher = new StreamGraphHasherV2();
+ this.legacyStreamGraphHashers = Collections.<StreamGraphHasher>singletonList(new StreamGraphHasherV1());
}
private void init() {
@@ -118,9 +112,15 @@ public class StreamingJobGraphGenerator {
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
- Map<Integer, byte[]> hashes = traverseStreamGraphAndGenerateHashes();
+ Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
+
+ // Generate legacy version hashes for backwards compatibility
+ List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
+ for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
+ legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
+ }
- setChaining(hashes);
+ setChaining(hashes, legacyHashes);
setPhysicalEdges();
@@ -164,9 +164,9 @@ public class StreamingJobGraphGenerator {
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
- private void setChaining(Map<Integer, byte[]> hashes) {
+ private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
- createChain(sourceNodeId, sourceNodeId, hashes, 0);
+ createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0);
}
}
@@ -174,6 +174,7 @@ public class StreamingJobGraphGenerator {
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
+ List<Map<Integer, byte[]>> legacyHashes,
int chainIndex) {
if (!builtVertices.contains(startNodeId)) {
@@ -192,18 +193,19 @@ public class StreamingJobGraphGenerator {
}
for (StreamEdge chainable : chainableOutputs) {
- transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), hashes, chainIndex + 1));
+ transitiveOutEdges.addAll(
+ createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
- createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, 0);
+ createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0);
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
StreamConfig config = currentNodeId.equals(startNodeId)
- ? createJobVertex(startNodeId, hashes)
+ ? createJobVertex(startNodeId, hashes, legacyHashes)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
@@ -261,7 +263,8 @@ public class StreamingJobGraphGenerator {
private StreamConfig createJobVertex(
Integer streamNodeId,
- Map<Integer, byte[]> hashes) {
+ Map<Integer, byte[]> hashes,
+ List<Map<Integer, byte[]>> legacyHashes) {
JobVertex jobVertex;
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
@@ -275,16 +278,26 @@ public class StreamingJobGraphGenerator {
JobVertexID jobVertexId = new JobVertexID(hash);
+ List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size());
+ for (Map<Integer, byte[]> legacyHash : legacyHashes) {
+ hash = legacyHash.get(streamNodeId);
+ if (null != hash) {
+ legacyJobVertexIds.add(new JobVertexID(hash));
+ }
+ }
+
if (streamNode.getInputFormat() != null) {
jobVertex = new InputFormatVertex(
chainedNames.get(streamNodeId),
- jobVertexId);
+ jobVertexId,
+ legacyJobVertexIds);
TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
} else {
jobVertex = new JobVertex(
chainedNames.get(streamNodeId),
- jobVertexId);
+ jobVertexId,
+ legacyJobVertexIds);
}
jobVertex.setInvokableClass(streamNode.getJobVertexClass());
@@ -518,249 +531,4 @@ public class StreamingJobGraphGenerator {
externalizedCheckpointSettings);
jobGraph.setSnapshotSettings(settings);
}
-
- // ------------------------------------------------------------------------
-
- /**
- * Returns a map with a hash for each {@link StreamNode} of the {@link
- * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
- * identify nodes across job submissions if they didn't change.
- *
- * <p>The complete {@link StreamGraph} is traversed. The hash is either
- * computed from the transformation's user-specified id (see
- * {@link StreamTransformation#getUid()}) or generated in a deterministic way.
- *
- * <p>The generated hash is deterministic with respect to:
- * <ul>
- * <li>node-local properties (like parallelism, UDF, node ID),
- * <li>chained output nodes, and
- * <li>input nodes hashes
- * </ul>
- *
- * @return A map from {@link StreamNode#id} to hash as 16-byte array.
- */
- private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
- // The hash function used to generate the hash
- final HashFunction hashFunction = Hashing.murmur3_128(0);
- final Map<Integer, byte[]> hashes = new HashMap<>();
-
- Set<Integer> visited = new HashSet<>();
- Queue<StreamNode> remaining = new ArrayDeque<>();
-
- // We need to make the source order deterministic. The source IDs are
- // not returned in the same order, which means that submitting the same
- // program twice might result in different traversal, which breaks the
- // deterministic hash assignment.
- List<Integer> sources = new ArrayList<>();
- for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
- sources.add(sourceNodeId);
- }
- Collections.sort(sources);
-
- //
- // Traverse the graph in a breadth-first manner. Keep in mind that
- // the graph is not a tree and multiple paths to nodes can exist.
- //
-
- // Start with source nodes
- for (Integer sourceNodeId : sources) {
- remaining.add(streamGraph.getStreamNode(sourceNodeId));
- visited.add(sourceNodeId);
- }
-
- StreamNode currentNode;
- while ((currentNode = remaining.poll()) != null) {
- // Generate the hash code. Because multiple path exist to each
- // node, we might not have all required inputs available to
- // generate the hash code.
- if (generateNodeHash(currentNode, hashFunction, hashes)) {
- // Add the child nodes
- for (StreamEdge outEdge : currentNode.getOutEdges()) {
- StreamNode child = outEdge.getTargetVertex();
-
- if (!visited.contains(child.getId())) {
- remaining.add(child);
- visited.add(child.getId());
- }
- }
- }
- else {
- // We will revisit this later.
- visited.remove(currentNode.getId());
- }
- }
-
- return hashes;
- }
-
- /**
- * Generates a hash for the node and returns whether the operation was
- * successful.
- *
- * @param node The node to generate the hash for
- * @param hashFunction The hash function to use
- * @param hashes The current state of generated hashes
- * @return <code>true</code> if the node hash has been generated.
- * <code>false</code>, otherwise. If the operation is not successful, the
- * hash needs be generated at a later point when all input is available.
- * @throws IllegalStateException If node has user-specified hash and is
- * intermediate node of a chain
- */
- private boolean generateNodeHash(
- StreamNode node,
- HashFunction hashFunction,
- Map<Integer, byte[]> hashes) {
-
- // Check for user-specified ID
- String userSpecifiedHash = node.getTransformationId();
-
- if (userSpecifiedHash == null) {
- // Check that all input nodes have their hashes computed
- for (StreamEdge inEdge : node.getInEdges()) {
- // If the input node has not been visited yet, the current
- // node will be visited again at a later point when all input
- // nodes have been visited and their hashes set.
- if (!hashes.containsKey(inEdge.getSourceId())) {
- return false;
- }
- }
-
- Hasher hasher = hashFunction.newHasher();
- byte[] hash = generateDeterministicHash(node, hasher, hashes);
-
- if (hashes.put(node.getId(), hash) != null) {
- // Sanity check
- throw new IllegalStateException("Unexpected state. Tried to add node hash " +
- "twice. This is probably a bug in the JobGraph generator.");
- }
-
- return true;
- }
- else {
- // Check that this node is not part of a chain. This is currently
- // not supported, because the runtime takes the snapshots by the
- // operator ID of the first vertex in a chain. It's OK if the node
- // has chained outputs.
- for (StreamEdge inEdge : node.getInEdges()) {
- if (isChainable(inEdge)) {
- throw new UnsupportedOperationException("Cannot assign user-specified hash "
- + "to intermediate node in chain. This will be supported in future "
- + "versions of Flink. As a work around start new chain at task "
- + node.getOperatorName() + ".");
- }
- }
-
- Hasher hasher = hashFunction.newHasher();
- byte[] hash = generateUserSpecifiedHash(node, hasher);
-
- for (byte[] previousHash : hashes.values()) {
- if (Arrays.equals(previousHash, hash)) {
- throw new IllegalArgumentException("Hash collision on user-specified ID. " +
- "Most likely cause is a non-unique ID. Please check that all IDs " +
- "specified via `uid(String)` are unique.");
- }
- }
-
- if (hashes.put(node.getId(), hash) != null) {
- // Sanity check
- throw new IllegalStateException("Unexpected state. Tried to add node hash " +
- "twice. This is probably a bug in the JobGraph generator.");
- }
-
- return true;
- }
- }
-
- /**
- * Generates a hash from a user-specified ID.
- */
- private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
- hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
-
- return hasher.hash().asBytes();
- }
-
- /**
- * Generates a deterministic hash from node-local properties and input and
- * output edges.
- */
- private byte[] generateDeterministicHash(
- StreamNode node,
- Hasher hasher,
- Map<Integer, byte[]> hashes) {
-
- // Include stream node to hash. We use the current size of the computed
- // hashes as the ID. We cannot use the node's ID, because it is
- // assigned from a static counter. This will result in two identical
- // programs having different hashes.
- generateNodeLocalHash(node, hasher, hashes.size());
-
- // Include chained nodes to hash
- for (StreamEdge outEdge : node.getOutEdges()) {
- if (isChainable(outEdge)) {
- StreamNode chainedNode = outEdge.getTargetVertex();
-
- // Use the hash size again, because the nodes are chained to
- // this node. This does not add a hash for the chained nodes.
- generateNodeLocalHash(chainedNode, hasher, hashes.size());
- }
- }
-
- byte[] hash = hasher.hash().asBytes();
-
- // Make sure that all input nodes have their hash set before entering
- // this loop (calling this method).
- for (StreamEdge inEdge : node.getInEdges()) {
- byte[] otherHash = hashes.get(inEdge.getSourceId());
-
- // Sanity check
- if (otherHash == null) {
- throw new IllegalStateException("Missing hash for input node "
- + inEdge.getSourceVertex() + ". Cannot generate hash for "
- + node + ".");
- }
-
- for (int j = 0; j < hash.length; j++) {
- hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
- }
- }
-
- if (LOG.isDebugEnabled()) {
- String udfClassName = "";
- if (node.getOperator() instanceof AbstractUdfStreamOperator) {
- udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
- .getUserFunction().getClass().getName();
- }
-
- LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
- "'" + node.toString() + "' {id: " + node.getId() + ", " +
- "parallelism: " + node.getParallelism() + ", " +
- "user function: " + udfClassName + "}");
- }
-
- return hash;
- }
-
- /**
- * Applies the {@link Hasher} to the {@link StreamNode} (only node local
- * attributes are taken into account). The hasher encapsulates the current
- * state of the hash.
- *
- * <p>The specified ID is local to this node. We cannot use the
- * {@link StreamNode#id}, because it is incremented in a static counter.
- * Therefore, the IDs for identical jobs will otherwise be different.
- */
- private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
- // This resolves conflicts for otherwise identical source nodes. BUT
- // the generated hash codes depend on the ordering of the nodes in the
- // stream graph.
- hasher.putInt(id);
-
- if (node.getOperator() instanceof AbstractUdfStreamOperator) {
- String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
- .getUserFunction().getClass().getName();
-
- hasher.putString(udfClassName, Charset.forName("UTF-8"));
- }
- }
}