You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/01/19 13:55:35 UTC
[flink] branch master updated: [FLINK-11256][Streaming] Improve
StreamEdge to reduce the sizes of JobGraph
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a7eb845 [FLINK-11256][Streaming] Improve StreamEdge to reduce the sizes of JobGraph
a7eb845 is described below
commit a7eb845e812ab0355854d3cf26843179a5a8b597
Author: sunhaibotb <su...@163.com>
AuthorDate: Thu Jan 3 16:37:21 2019 +0800
[FLINK-11256][Streaming] Improve StreamEdge to reduce the sizes of JobGraph
This closes #7403
---
.../flink/streaming/api/graph/StreamEdge.java | 38 ++++++++++++----------
.../flink/streaming/api/graph/StreamGraph.java | 12 +++++--
.../streaming/api/graph/StreamGraphHasherV2.java | 22 +++++++------
.../api/graph/StreamingJobGraphGenerator.java | 4 +--
.../api/graph/StreamGraphGeneratorTest.java | 2 +-
.../test/streaming/runtime/IterateITCase.java | 17 +++++-----
6 files changed, 55 insertions(+), 40 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 2e89932..33a16b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -36,8 +36,8 @@ public class StreamEdge implements Serializable {
private final String edgeId;
- private final StreamNode sourceVertex;
- private final StreamNode targetVertex;
+ private final int sourceId;
+ private final int targetId;
/**
* The type number of the input for co-tasks.
@@ -60,33 +60,37 @@ public class StreamEdge implements Serializable {
*/
private StreamPartitioner<?> outputPartitioner;
+ /**
+ * The name of the operator in the source vertex.
+ */
+ private final String sourceOperatorName;
+
+ /**
+ * The name of the operator in the target vertex.
+ */
+ private final String targetOperatorName;
+
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
- this.sourceVertex = sourceVertex;
- this.targetVertex = targetVertex;
+ this.sourceId = sourceVertex.getId();
+ this.targetId = targetVertex.getId();
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
this.outputTag = outputTag;
+ this.sourceOperatorName = sourceVertex.getOperatorName();
+ this.targetOperatorName = targetVertex.getOperatorName();
this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
+ "_" + outputPartitioner;
}
- public StreamNode getSourceVertex() {
- return sourceVertex;
- }
-
- public StreamNode getTargetVertex() {
- return targetVertex;
- }
-
public int getSourceId() {
- return sourceVertex.getId();
+ return sourceId;
}
public int getTargetId() {
- return targetVertex.getId();
+ return targetId;
}
public int getTypeNumber() {
@@ -130,8 +134,8 @@ public class StreamEdge implements Serializable {
@Override
public String toString() {
- return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
- + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
- + ", outputTag=" + outputTag + ')';
+ return "(" + (sourceOperatorName + "-" + sourceId) + " -> " + (targetOperatorName + "-" + targetId)
+ + ", typeNumber=" + typeNumber + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+ + ", outputTag=" + outputTag + ')';
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 46a4ce2..f4950ec 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -635,9 +635,17 @@ public class StreamGraph extends StreamingPlan {
return iterationSourceSinkPairs;
}
+ public StreamNode getSourceVertex(StreamEdge edge) {
+ return streamNodes.get(edge.getSourceId());
+ }
+
+ public StreamNode getTargetVertex(StreamEdge edge) {
+ return streamNodes.get(edge.getTargetId());
+ }
+
private void removeEdge(StreamEdge edge) {
- edge.getSourceVertex().getOutEdges().remove(edge);
- edge.getTargetVertex().getInEdges().remove(edge);
+ getSourceVertex(edge).getOutEdges().remove(edge);
+ getTargetVertex(edge).getInEdges().remove(edge);
}
private void removeVertex(StreamNode toRemove) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
index a87f49f..0ab3e8b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
@@ -109,10 +109,10 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
// Generate the hash code. Because multiple path exist to each
// node, we might not have all required inputs available to
// generate the hash code.
- if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
+ if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled(), streamGraph)) {
// Add the child nodes
for (StreamEdge outEdge : currentNode.getOutEdges()) {
- StreamNode child = outEdge.getTargetVertex();
+ StreamNode child = streamGraph.getTargetVertex(outEdge);
if (!visited.contains(child.getId())) {
remaining.add(child);
@@ -145,7 +145,8 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
StreamNode node,
HashFunction hashFunction,
Map<Integer, byte[]> hashes,
- boolean isChainingEnabled) {
+ boolean isChainingEnabled,
+ StreamGraph streamGraph) {
// Check for user-specified ID
String userSpecifiedHash = node.getTransformationUID();
@@ -162,7 +163,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
}
Hasher hasher = hashFunction.newHasher();
- byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
+ byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);
if (hashes.put(node.getId(), hash) != null) {
// Sanity check
@@ -211,7 +212,8 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
StreamNode node,
Hasher hasher,
Map<Integer, byte[]> hashes,
- boolean isChainingEnabled) {
+ boolean isChainingEnabled,
+ StreamGraph streamGraph) {
// Include stream node to hash. We use the current size of the computed
// hashes as the ID. We cannot use the node's ID, because it is
@@ -221,7 +223,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
// Include chained nodes to hash
for (StreamEdge outEdge : node.getOutEdges()) {
- if (isChainable(outEdge, isChainingEnabled)) {
+ if (isChainable(outEdge, isChainingEnabled, streamGraph)) {
// Use the hash size again, because the nodes are chained to
// this node. This does not add a hash for the chained nodes.
@@ -239,7 +241,7 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
// Sanity check
if (otherHash == null) {
throw new IllegalStateException("Missing hash for input node "
- + inEdge.getSourceVertex() + ". Cannot generate hash for "
+ + streamGraph.getSourceVertex(inEdge) + ". Cannot generate hash for "
+ node + ".");
}
@@ -279,9 +281,9 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
hasher.putInt(id);
}
- private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
- StreamNode upStreamVertex = edge.getSourceVertex();
- StreamNode downStreamVertex = edge.getTargetVertex();
+ private boolean isChainable(StreamEdge edge, boolean isChainingEnabled, StreamGraph streamGraph) {
+ StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
+ StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index bc0377b..8465939 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -520,8 +520,8 @@ public class StreamingJobGraphGenerator {
}
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
- StreamNode upStreamVertex = edge.getSourceVertex();
- StreamNode downStreamVertex = edge.getTargetVertex();
+ StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
+ StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 3ceb21a..0ae483e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -168,7 +168,7 @@ public class StreamGraphGeneratorTest {
// verify that only last partitioning takes precedence
assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertEquals(rebalanceMap.getId(), graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getSourceVertex().getId());
+ assertEquals(rebalanceMap.getId(), graph.getSourceVertex(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0)).getId());
// verify that partitioning in unions is preserved and that it works across split/select
assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 3b3c7ff..332584d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -200,7 +200,8 @@ public class IterateITCase extends AbstractTestBase {
assertEquals(2, graph.getIterationSourceSinkPairs().size());
for (Tuple2<StreamNode, StreamNode> sourceSinkPair: graph.getIterationSourceSinkPairs()) {
- assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex());
+ assertEquals(graph.getTargetVertex(sourceSinkPair.f0.getOutEdges().get(0)),
+ graph.getSourceVertex(sourceSinkPair.f1.getInEdges().get(0)));
}
}
@@ -244,9 +245,9 @@ public class IterateITCase extends AbstractTestBase {
assertEquals(itSource.getParallelism(), itSink.getParallelism());
for (StreamEdge edge : itSource.getOutEdges()) {
- if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
+ if (graph.getTargetVertex(edge).getOperatorName().equals("IterRebalanceMap")) {
assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
- } else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
+ } else if (graph.getTargetVertex(edge).getOperatorName().equals("IterForwardMap")) {
assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
}
}
@@ -331,16 +332,16 @@ public class IterateITCase extends AbstractTestBase {
assertEquals(itSource.getParallelism(), itSink.getParallelism());
for (StreamEdge edge : itSource.getOutEdges()) {
- if (edge.getTargetVertex().getOperatorName().equals("map1")) {
+ if (graph.getTargetVertex(edge).getOperatorName().equals("map1")) {
assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
- assertEquals(4, edge.getTargetVertex().getParallelism());
- } else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) {
+ assertEquals(4, graph.getTargetVertex(edge).getParallelism());
+ } else if (graph.getTargetVertex(edge).getOperatorName().equals("shuffle")) {
assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
- assertEquals(2, edge.getTargetVertex().getParallelism());
+ assertEquals(2, graph.getTargetVertex(edge).getParallelism());
}
}
for (StreamEdge edge : itSink.getInEdges()) {
- String tailName = edge.getSourceVertex().getOperatorName();
+ String tailName = graph.getSourceVertex(edge).getOperatorName();
if (tailName.equals("split")) {
assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
assertTrue(edge.getSelectedNames().contains("even"));