You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/01/19 13:55:35 UTC

[flink] branch master updated: [FLINK-11256][Streaming] Improve StreamEdge to reduce the sizes of JobGraph

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a7eb845  [FLINK-11256][Streaming] Improve StreamEdge to reduce the sizes of JobGraph
a7eb845 is described below

commit a7eb845e812ab0355854d3cf26843179a5a8b597
Author: sunhaibotb <su...@163.com>
AuthorDate: Thu Jan 3 16:37:21 2019 +0800

    [FLINK-11256][Streaming] Improve StreamEdge to reduce the sizes of JobGraph
    
    This closes #7403
---
 .../flink/streaming/api/graph/StreamEdge.java      | 38 ++++++++++++----------
 .../flink/streaming/api/graph/StreamGraph.java     | 12 +++++--
 .../streaming/api/graph/StreamGraphHasherV2.java   | 22 +++++++------
 .../api/graph/StreamingJobGraphGenerator.java      |  4 +--
 .../api/graph/StreamGraphGeneratorTest.java        |  2 +-
 .../test/streaming/runtime/IterateITCase.java      | 17 +++++-----
 6 files changed, 55 insertions(+), 40 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 2e89932..33a16b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -36,8 +36,8 @@ public class StreamEdge implements Serializable {
 
 	private final String edgeId;
 
-	private final StreamNode sourceVertex;
-	private final StreamNode targetVertex;
+	private final int sourceId;
+	private final int targetId;
 
 	/**
 	 * The type number of the input for co-tasks.
@@ -60,33 +60,37 @@ public class StreamEdge implements Serializable {
 	 */
 	private StreamPartitioner<?> outputPartitioner;
 
+	/**
+	 * The name of the operator in the source vertex.
+	 */
+	private final String sourceOperatorName;
+
+	/**
+	 * The name of the operator in the target vertex.
+	 */
+	private final String targetOperatorName;
+
 	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
 			List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
-		this.sourceVertex = sourceVertex;
-		this.targetVertex = targetVertex;
+		this.sourceId = sourceVertex.getId();
+		this.targetId = targetVertex.getId();
 		this.typeNumber = typeNumber;
 		this.selectedNames = selectedNames;
 		this.outputPartitioner = outputPartitioner;
 		this.outputTag = outputTag;
+		this.sourceOperatorName = sourceVertex.getOperatorName();
+		this.targetOperatorName = targetVertex.getOperatorName();
 
 		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
 				+ "_" + outputPartitioner;
 	}
 
-	public StreamNode getSourceVertex() {
-		return sourceVertex;
-	}
-
-	public StreamNode getTargetVertex() {
-		return targetVertex;
-	}
-
 	public int getSourceId() {
-		return sourceVertex.getId();
+		return sourceId;
 	}
 
 	public int getTargetId() {
-		return targetVertex.getId();
+		return targetId;
 	}
 
 	public int getTypeNumber() {
@@ -130,8 +134,8 @@ public class StreamEdge implements Serializable {
 
 	@Override
 	public String toString() {
-		return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
-				+ ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
-				+ ", outputTag=" + outputTag + ')';
+		return "(" + (sourceOperatorName + "-" + sourceId) + " -> " + (targetOperatorName + "-" + targetId)
+			+ ", typeNumber=" + typeNumber + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+			+ ", outputTag=" + outputTag + ')';
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 46a4ce2..f4950ec 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -635,9 +635,17 @@ public class StreamGraph extends StreamingPlan {
 		return iterationSourceSinkPairs;
 	}
 
+	public StreamNode getSourceVertex(StreamEdge edge) {
+		return streamNodes.get(edge.getSourceId());
+	}
+
+	public StreamNode getTargetVertex(StreamEdge edge) {
+		return streamNodes.get(edge.getTargetId());
+	}
+
 	private void removeEdge(StreamEdge edge) {
-		edge.getSourceVertex().getOutEdges().remove(edge);
-		edge.getTargetVertex().getInEdges().remove(edge);
+		getSourceVertex(edge).getOutEdges().remove(edge);
+		getTargetVertex(edge).getInEdges().remove(edge);
 	}
 
 	private void removeVertex(StreamNode toRemove) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
index a87f49f..0ab3e8b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
@@ -109,10 +109,10 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 			// Generate the hash code. Because multiple path exist to each
 			// node, we might not have all required inputs available to
 			// generate the hash code.
-			if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
+			if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled(), streamGraph)) {
 				// Add the child nodes
 				for (StreamEdge outEdge : currentNode.getOutEdges()) {
-					StreamNode child = outEdge.getTargetVertex();
+					StreamNode child = streamGraph.getTargetVertex(outEdge);
 
 					if (!visited.contains(child.getId())) {
 						remaining.add(child);
@@ -145,7 +145,8 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 			StreamNode node,
 			HashFunction hashFunction,
 			Map<Integer, byte[]> hashes,
-			boolean isChainingEnabled) {
+			boolean isChainingEnabled,
+			StreamGraph streamGraph) {
 
 		// Check for user-specified ID
 		String userSpecifiedHash = node.getTransformationUID();
@@ -162,7 +163,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 			}
 
 			Hasher hasher = hashFunction.newHasher();
-			byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
+			byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);
 
 			if (hashes.put(node.getId(), hash) != null) {
 				// Sanity check
@@ -211,7 +212,8 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 			StreamNode node,
 			Hasher hasher,
 			Map<Integer, byte[]> hashes,
-			boolean isChainingEnabled) {
+			boolean isChainingEnabled,
+			StreamGraph streamGraph) {
 
 		// Include stream node to hash. We use the current size of the computed
 		// hashes as the ID. We cannot use the node's ID, because it is
@@ -221,7 +223,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 
 		// Include chained nodes to hash
 		for (StreamEdge outEdge : node.getOutEdges()) {
-			if (isChainable(outEdge, isChainingEnabled)) {
+			if (isChainable(outEdge, isChainingEnabled, streamGraph)) {
 
 				// Use the hash size again, because the nodes are chained to
 				// this node. This does not add a hash for the chained nodes.
@@ -239,7 +241,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 			// Sanity check
 			if (otherHash == null) {
 				throw new IllegalStateException("Missing hash for input node "
-						+ inEdge.getSourceVertex() + ". Cannot generate hash for "
+						+ streamGraph.getSourceVertex(inEdge) + ". Cannot generate hash for "
 						+ node + ".");
 			}
 
@@ -279,9 +281,9 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 		hasher.putInt(id);
 	}
 
-	private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
-		StreamNode upStreamVertex = edge.getSourceVertex();
-		StreamNode downStreamVertex = edge.getTargetVertex();
+	private boolean isChainable(StreamEdge edge, boolean isChainingEnabled, StreamGraph streamGraph) {
+		StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
+		StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
 		StreamOperator<?> headOperator = upStreamVertex.getOperator();
 		StreamOperator<?> outOperator = downStreamVertex.getOperator();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index bc0377b..8465939 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -520,8 +520,8 @@ public class StreamingJobGraphGenerator {
 	}
 
 	public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
-		StreamNode upStreamVertex = edge.getSourceVertex();
-		StreamNode downStreamVertex = edge.getTargetVertex();
+		StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
+		StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
 		StreamOperator<?> headOperator = upStreamVertex.getOperator();
 		StreamOperator<?> outOperator = downStreamVertex.getOperator();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 3ceb21a..0ae483e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -168,7 +168,7 @@ public class StreamGraphGeneratorTest {
 
 		// verify that only last partitioning takes precedence
 		assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertEquals(rebalanceMap.getId(), graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getSourceVertex().getId());
+		assertEquals(rebalanceMap.getId(), graph.getSourceVertex(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0)).getId());
 
 		// verify that partitioning in unions is preserved and that it works across split/select
 		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 3b3c7ff..332584d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -200,7 +200,8 @@ public class IterateITCase extends AbstractTestBase {
 		assertEquals(2, graph.getIterationSourceSinkPairs().size());
 
 		for (Tuple2<StreamNode, StreamNode> sourceSinkPair: graph.getIterationSourceSinkPairs()) {
-			assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex());
+			assertEquals(graph.getTargetVertex(sourceSinkPair.f0.getOutEdges().get(0)),
+				graph.getSourceVertex(sourceSinkPair.f1.getInEdges().get(0)));
 		}
 	}
 
@@ -244,9 +245,9 @@ public class IterateITCase extends AbstractTestBase {
 		assertEquals(itSource.getParallelism(), itSink.getParallelism());
 
 		for (StreamEdge edge : itSource.getOutEdges()) {
-			if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
+			if (graph.getTargetVertex(edge).getOperatorName().equals("IterRebalanceMap")) {
 				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
-			} else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
+			} else if (graph.getTargetVertex(edge).getOperatorName().equals("IterForwardMap")) {
 				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
 			}
 		}
@@ -331,16 +332,16 @@ public class IterateITCase extends AbstractTestBase {
 		assertEquals(itSource.getParallelism(), itSink.getParallelism());
 
 		for (StreamEdge edge : itSource.getOutEdges()) {
-			if (edge.getTargetVertex().getOperatorName().equals("map1")) {
+			if (graph.getTargetVertex(edge).getOperatorName().equals("map1")) {
 				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
-				assertEquals(4, edge.getTargetVertex().getParallelism());
-			} else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) {
+				assertEquals(4, graph.getTargetVertex(edge).getParallelism());
+			} else if (graph.getTargetVertex(edge).getOperatorName().equals("shuffle")) {
 				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
-				assertEquals(2, edge.getTargetVertex().getParallelism());
+				assertEquals(2, graph.getTargetVertex(edge).getParallelism());
 			}
 		}
 		for (StreamEdge edge : itSink.getInEdges()) {
-			String tailName = edge.getSourceVertex().getOperatorName();
+			String tailName = graph.getSourceVertex(edge).getOperatorName();
 			if (tailName.equals("split")) {
 				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
 				assertTrue(edge.getSelectedNames().contains("even"));