You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/02 13:18:43 UTC
[3/8] flink git commit: [FLINK-9216][Streaming] Fix comparator
violation
[FLINK-9216][Streaming] Fix comparator violation
This closes #5878.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3242214b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3242214b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3242214b
Branch: refs/heads/master
Commit: 3242214bac3719d057aeaeb34259039dbbf09fb2
Parents: dee6394
Author: Xpray <le...@gmail.com>
Authored: Mon Apr 23 15:37:39 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed May 2 15:18:06 2018 +0200
----------------------------------------------------------------------
.../streaming/api/graph/JSONGenerator.java | 12 ++++++----
.../api/graph/StreamGraphGeneratorTest.java | 24 ++++++++++++++++++++
2 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3242214b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 263e0aa..3f82cf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -61,14 +61,16 @@ public class JSONGenerator {
List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
Collections.sort(operatorIDs, new Comparator<Integer>() {
@Override
- public int compare(Integer o1, Integer o2) {
+ public int compare(Integer idOne, Integer idTwo) {
+ boolean isIdOneSinkId = streamGraph.getSinkIDs().contains(idOne);
+ boolean isIdTwoSinkId = streamGraph.getSinkIDs().contains(idTwo);
// put sinks at the back
- if (streamGraph.getSinkIDs().contains(o1)) {
+ if (isIdOneSinkId == isIdTwoSinkId) {
+ return idOne.compareTo(idTwo);
+ } else if (isIdOneSinkId) {
return 1;
- } else if (streamGraph.getSinkIDs().contains(o2)) {
- return -1;
} else {
- return o1 - o2;
+ return -1;
}
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/3242214b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 8149d24..d10fb3c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
@@ -413,6 +414,29 @@ public class StreamGraphGeneratorTest {
StreamPartitioner<?> streamPartitioner2 = keyedResultNode.getInEdges().get(1).getPartitioner();
}
+ /**
+ * Tests that the json generated by JSONGenerator shall meet with 2 requirements:
+ * 1. sink nodes are at the back
+ * 2. if both two nodes are sink nodes or neither of them is sink node, then sort by its id.
+ */
+ @Test
+ public void testSinkIdComparison() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Integer> source = env.fromElements(1, 2, 3);
+ for (int i = 0; i < 32; i++) {
+ if (i % 2 == 0) {
+ source.addSink(new SinkFunction<Integer>() {
+ @Override
+ public void invoke(Integer value) throws Exception {}
+ });
+ } else {
+ source.map(x -> x + 1);
+ }
+ }
+ // IllegalArgumentException will be thrown without FLINK-9216
+ env.getStreamGraph().getStreamingPlanAsJSON();
+ }
+
private static class OutputTypeConfigurableOperationWithTwoInputs
extends AbstractStreamOperator<Integer>
implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {