You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/02 13:18:43 UTC

[3/8] flink git commit: [FLINK-9216][Streaming] Fix comparator violation

[FLINK-9216][Streaming] Fix comparator violation

This closes #5878.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3242214b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3242214b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3242214b

Branch: refs/heads/master
Commit: 3242214bac3719d057aeaeb34259039dbbf09fb2
Parents: dee6394
Author: Xpray <le...@gmail.com>
Authored: Mon Apr 23 15:37:39 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed May 2 15:18:06 2018 +0200

----------------------------------------------------------------------
 .../streaming/api/graph/JSONGenerator.java      | 12 ++++++----
 .../api/graph/StreamGraphGeneratorTest.java     | 24 ++++++++++++++++++++
 2 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3242214b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 263e0aa..3f82cf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -61,14 +61,16 @@ public class JSONGenerator {
 		List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
 		Collections.sort(operatorIDs, new Comparator<Integer>() {
 			@Override
-			public int compare(Integer o1, Integer o2) {
+			public int compare(Integer idOne, Integer idTwo) {
+				boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne);
+				boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo);
 				// put sinks at the back
-				if (streamGraph.getSinkIDs().contains(o1)) {
+				if (isIdOneSinkId == isIdTwoSinkId) {
+					return idOne.compareTo(idTwo);
+				} else if (isIdOneSinkId) {
 					return 1;
-				} else if (streamGraph.getSinkIDs().contains(o2)) {
-					return -1;
 				} else {
-					return o1 - o2;
+					return -1;
 				}
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/3242214b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 8149d24..d10fb3c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -413,6 +414,29 @@ public class StreamGraphGeneratorTest {
 		StreamPartitioner<?> streamPartitioner2 = keyedResultNode.getInEdges().get(1).getPartitioner();
 	}
 
+	/**
+	 * Tests that the json generated by JSONGenerator shall meet with 2 requirements:
+	 * 1. sink nodes are at the back
+	 * 2. if both two nodes are sink nodes or neither of them is sink node, then sort by its id.
+	 */
+	@Test
+	public void testSinkIdComparison() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<Integer> source = env.fromElements(1, 2, 3);
+		for (int i = 0; i < 32; i++) {
+			if (i % 2 == 0) {
+				source.addSink(new SinkFunction<Integer>() {
+					@Override
+					public void invoke(Integer value) throws Exception {}
+				});
+			} else {
+				source.map(x -> x + 1);
+			}
+		}
+		// IllegalArgumentException will be thrown without FLINK-9216
+		env.getStreamGraph().getStreamingPlanAsJSON();
+	}
+
 	private static class OutputTypeConfigurableOperationWithTwoInputs
 			extends AbstractStreamOperator<Integer>
 			implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {