You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:40:00 UTC
[04/18] flink git commit: [FLINK-5380] Fix task metrics reuse for
single-operator chains
[FLINK-5380] Fix task metrics reuse for single-operator chains
This closes #3068.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb059157
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb059157
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb059157
Branch: refs/heads/master
Commit: cb05915759b1d5ea4dbfcdd3ff76dcfd9cebe601
Parents: 9945904
Author: zentol <ch...@apache.org>
Authored: Thu Jan 5 14:37:03 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 19 23:57:20 2017 +0100
----------------------------------------------------------------------
.../api/graph/StreamingJobGraphGenerator.java | 6 ++--
.../graph/StreamingJobGraphGeneratorTest.java | 38 ++++++++++++++++++++
2 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cb059157/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1bfaf3f..f562b98 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -234,9 +234,9 @@ public class StreamingJobGraphGenerator {
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
- if (chainableOutputs.isEmpty()) {
- config.setChainEnd();
- }
+ }
+ if (chainableOutputs.isEmpty()) {
+ config.setChainEnd();
}
return transitiveOutEdges;
http://git-wip-us.apache.org/repos/asf/flink/blob/cb059157/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index b817c93..4d462d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,11 +32,13 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
+import java.util.Map;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
@SuppressWarnings("serial")
public class StreamingJobGraphGeneratorTest extends TestLogger {
@@ -170,4 +173,39 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval());
}
+
+ /**
+ * Verifies that the chain start/end is correctly set.
+ */
+ @Test
+ public void testChainStartEndSetting() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // fromElements -> CHAIN(Map -> Print)
+ env.fromElements(1, 2, 3)
+ .map(new MapFunction<Integer, Integer>() {
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+ })
+ .print();
+ JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+
+ JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
+ JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+
+ StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
+ StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
+ Map<Integer, StreamConfig> chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
+ StreamConfig printConfig = chainedConfigs.get(3);
+
+ assertTrue(sourceConfig.isChainStart());
+ assertTrue(sourceConfig.isChainEnd());
+
+ assertTrue(mapConfig.isChainStart());
+ assertFalse(mapConfig.isChainEnd());
+
+ assertFalse(printConfig.isChainStart());
+ assertTrue(printConfig.isChainEnd());
+ }
}