You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:40:00 UTC

[04/18] flink git commit: [FLINK-5380] Fix task metrics reuse for single-operator chains

[FLINK-5380] Fix task metrics reuse for single-operator chains

This closes #3068.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb059157
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb059157
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb059157

Branch: refs/heads/master
Commit: cb05915759b1d5ea4dbfcdd3ff76dcfd9cebe601
Parents: 9945904
Author: zentol <ch...@apache.org>
Authored: Thu Jan 5 14:37:03 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 19 23:57:20 2017 +0100

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |  6 ++--
 .../graph/StreamingJobGraphGeneratorTest.java   | 38 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb059157/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1bfaf3f..f562b98 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -234,9 +234,9 @@ public class StreamingJobGraphGenerator {
 				config.setChainIndex(chainIndex);
 				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 				chainedConfigs.get(startNodeId).put(currentNodeId, config);
-				if (chainableOutputs.isEmpty()) {
-					config.setChainEnd();
-				}
+			}
+			if (chainableOutputs.isEmpty()) {
+				config.setChainEnd();
 			}
 
 			return transitiveOutEdges;

http://git-wip-us.apache.org/repos/asf/flink/blob/cb059157/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index b817c93..4d462d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,11 +32,13 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
@@ -170,4 +173,39 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
 		assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval());
 	}
+
+	/**
+	 * Verifies that the chain start/end is correctly set.
+	 */
+	@Test
+	public void testChainStartEndSetting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// fromElements -> CHAIN(Map -> Print)
+		env.fromElements(1, 2, 3)
+			.map(new MapFunction<Integer, Integer>() {
+				@Override
+				public Integer map(Integer value) throws Exception {
+					return value;
+				}
+			})
+			.print();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+
+		JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
+		JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+
+		StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
+		StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
+		Map<Integer, StreamConfig> chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
+		StreamConfig printConfig = chainedConfigs.get(3);
+
+		assertTrue(sourceConfig.isChainStart());
+		assertTrue(sourceConfig.isChainEnd());
+
+		assertTrue(mapConfig.isChainStart());
+		assertFalse(mapConfig.isChainEnd());
+
+		assertFalse(printConfig.isChainStart());
+		assertTrue(printConfig.isChainEnd());
+	}
 }