You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/12 07:44:44 UTC

flink git commit: [FLINK-9809] [DataSteam API] Allow setting co-location constraints on StreamTransformations.

Repository: flink
Updated Branches:
  refs/heads/master 3c4e59a7f -> 3f0b9fee5


[FLINK-9809] [DataSteam API] Allow setting co-location constraints on StreamTransformations.

This feature is currently only exposed on StreamTransformations (internal API) rather
than in the public API, because it is a hidden expert feature.

This closes #6309


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f0b9fee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f0b9fee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f0b9fee

Branch: refs/heads/master
Commit: 3f0b9fee5185fcc7200179459bed31fb4bd08bbf
Parents: 3c4e59a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 11 17:48:10 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 12 09:38:43 2018 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamGraph.java  |  22 ++--
 .../api/graph/StreamGraphGenerator.java         |   7 +-
 .../flink/streaming/api/graph/StreamNode.java   |  14 +++
 .../api/graph/StreamingJobGraphGenerator.java   |  43 ++++++--
 .../transformations/StreamTransformation.java   |  34 +++++++
 .../StreamGraphCoLocationConstraintTest.java    | 102 +++++++++++++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java   |   2 +
 .../runtime/tasks/StreamConfigChainer.java      |   8 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |   4 +-
 .../tasks/TwoInputStreamTaskTestHarness.java    |   4 +-
 10 files changed, 215 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index e5ed0c2..01768ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -53,6 +53,8 @@ import org.apache.flink.util.OutputTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -163,38 +165,41 @@ public class StreamGraph extends StreamingPlan {
 
 	public <IN, OUT> void addSource(Integer vertexID,
 		String slotSharingGroup,
+		@Nullable String coLocationGroup,
 		StreamOperator<OUT> operatorObject,
 		TypeInformation<IN> inTypeInfo,
 		TypeInformation<OUT> outTypeInfo,
 		String operatorName) {
-		addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
+		addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
 		sources.add(vertexID);
 	}
 
 	public <IN, OUT> void addSink(Integer vertexID,
 		String slotSharingGroup,
+		@Nullable String coLocationGroup,
 		StreamOperator<OUT> operatorObject,
 		TypeInformation<IN> inTypeInfo,
 		TypeInformation<OUT> outTypeInfo,
 		String operatorName) {
-		addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
+		addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
 		sinks.add(vertexID);
 	}
 
 	public <IN, OUT> void addOperator(
 			Integer vertexID,
 			String slotSharingGroup,
+			@Nullable String coLocationGroup,
 			StreamOperator<OUT> operatorObject,
 			TypeInformation<IN> inTypeInfo,
 			TypeInformation<OUT> outTypeInfo,
 			String operatorName) {
 
 		if (operatorObject instanceof StoppableStreamSource) {
-			addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
+			addNode(vertexID, slotSharingGroup, coLocationGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
 		} else if (operatorObject instanceof StreamSource) {
-			addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
+			addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorObject, operatorName);
 		} else {
-			addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
+			addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorObject, operatorName);
 		}
 
 		TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
@@ -223,13 +228,14 @@ public class StreamGraph extends StreamingPlan {
 	public <IN1, IN2, OUT> void addCoOperator(
 			Integer vertexID,
 			String slotSharingGroup,
+			@Nullable String coLocationGroup,
 			TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject,
 			TypeInformation<IN1> in1TypeInfo,
 			TypeInformation<IN2> in2TypeInfo,
 			TypeInformation<OUT> outTypeInfo,
 			String operatorName) {
 
-		addNode(vertexID, slotSharingGroup, TwoInputStreamTask.class, taskOperatorObject, operatorName);
+		addNode(vertexID, slotSharingGroup, coLocationGroup, TwoInputStreamTask.class, taskOperatorObject, operatorName);
 
 		TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
 				outTypeInfo.createSerializer(executionConfig) : null;
@@ -250,6 +256,7 @@ public class StreamGraph extends StreamingPlan {
 
 	protected StreamNode addNode(Integer vertexID,
 		String slotSharingGroup,
+		@Nullable String coLocationGroup,
 		Class<? extends AbstractInvokable> vertexClass,
 		StreamOperator<?> operatorObject,
 		String operatorName) {
@@ -261,6 +268,7 @@ public class StreamGraph extends StreamingPlan {
 		StreamNode vertex = new StreamNode(environment,
 			vertexID,
 			slotSharingGroup,
+			coLocationGroup,
 			operatorObject,
 			operatorName,
 			new ArrayList<OutputSelector<?>>(),
@@ -593,6 +601,7 @@ public class StreamGraph extends StreamingPlan {
 		ResourceSpec preferredResources) {
 		StreamNode source = this.addNode(sourceId,
 			null,
+			null,
 			StreamIterationHead.class,
 			null,
 			"IterationSource-" + loopId);
@@ -603,6 +612,7 @@ public class StreamGraph extends StreamingPlan {
 
 		StreamNode sink = this.addNode(sinkId,
 			null,
+			null,
 			StreamIterationTail.class,
 			null,
 			"IterationSink-" + loopId);

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 11a7002..2c4ae4a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -466,9 +466,11 @@ public class StreamGraphGenerator {
 	 * Transforms a {@code SourceTransformation}.
 	 */
 	private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
-		String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), new ArrayList<Integer>());
+		String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
+
 		streamGraph.addSource(source.getId(),
 				slotSharingGroup,
+				source.getCoLocationGroupKey(),
 				source.getOperator(),
 				null,
 				source.getOutputType(),
@@ -493,6 +495,7 @@ public class StreamGraphGenerator {
 
 		streamGraph.addSink(sink.getId(),
 				slotSharingGroup,
+				sink.getCoLocationGroupKey(),
 				sink.getOperator(),
 				sink.getInput().getOutputType(),
 				null,
@@ -535,6 +538,7 @@ public class StreamGraphGenerator {
 
 		streamGraph.addOperator(transform.getId(),
 				slotSharingGroup,
+				transform.getCoLocationGroupKey(),
 				transform.getOperator(),
 				transform.getInputType(),
 				transform.getOutputType(),
@@ -580,6 +584,7 @@ public class StreamGraphGenerator {
 		streamGraph.addCoOperator(
 				transform.getId(),
 				slotSharingGroup,
+				transform.getCoLocationGroupKey(),
 				transform.getOperator(),
 				transform.getInputType1(),
 				transform.getInputType2(),

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 78ab877..fe12662 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -54,6 +56,7 @@ public class StreamNode implements Serializable {
 	private Long bufferTimeout = null;
 	private final String operatorName;
 	private String slotSharingGroup;
+	private @Nullable String coLocationGroup;
 	private KeySelector<?, ?> statePartitioner1;
 	private KeySelector<?, ?> statePartitioner2;
 	private TypeSerializer<?> stateKeySerializer;
@@ -77,10 +80,12 @@ public class StreamNode implements Serializable {
 	public StreamNode(StreamExecutionEnvironment env,
 		Integer id,
 		String slotSharingGroup,
+		@Nullable String coLocationGroup,
 		StreamOperator<?> operator,
 		String operatorName,
 		List<OutputSelector<?>> outputSelector,
 		Class<? extends AbstractInvokable> jobVertexClass) {
+
 		this.env = env;
 		this.id = id;
 		this.operatorName = operatorName;
@@ -88,6 +93,7 @@ public class StreamNode implements Serializable {
 		this.outputSelectors = outputSelector;
 		this.jobVertexClass = jobVertexClass;
 		this.slotSharingGroup = slotSharingGroup;
+		this.coLocationGroup = coLocationGroup;
 	}
 
 	public void addInEdge(StreamEdge inEdge) {
@@ -253,6 +259,14 @@ public class StreamNode implements Serializable {
 		return slotSharingGroup;
 	}
 
+	public void setCoLocationGroup(@Nullable String coLocationGroup) {
+		this.coLocationGroup = coLocationGroup;
+	}
+
+	public @Nullable String getCoLocationGroup() {
+		return coLocationGroup;
+	}
+
 	public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {
 		return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null) ||
 				(slotSharingGroup != null && slotSharingGroup.equals(downstreamVertex.slotSharingGroup));

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5b8254d..603b9e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -152,7 +152,7 @@ public class StreamingJobGraphGenerator {
 
 		setPhysicalEdges();
 
-		setSlotSharing();
+		setSlotSharingAndCoLocation();
 
 		configureCheckpointing();
 
@@ -531,20 +531,43 @@ public class StreamingJobGraphGenerator {
 				&& streamGraph.isChainingEnabled();
 	}
 
-	private void setSlotSharing() {
-
-		Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
+	private void setSlotSharingAndCoLocation() {
+		final HashMap<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
+		final HashMap<String, Tuple2<SlotSharingGroup, CoLocationGroup>> coLocationGroups = new HashMap<>();
 
 		for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
 
-			String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
+			final StreamNode node = streamGraph.getStreamNode(entry.getKey());
+			final JobVertex vertex = entry.getValue();
+
+			// configure slot sharing group
+			final String slotSharingGroupKey = node.getSlotSharingGroup();
+			final SlotSharingGroup sharingGroup;
+
+			if (slotSharingGroupKey != null) {
+				sharingGroup = slotSharingGroups.computeIfAbsent(
+						slotSharingGroupKey, (k) -> new SlotSharingGroup());
+				vertex.setSlotSharingGroup(sharingGroup);
+			} else {
+				sharingGroup = null;
+			}
+
+			// configure co-location constraint
+			final String coLocationGroupKey = node.getCoLocationGroup();
+			if (coLocationGroupKey != null) {
+				if (sharingGroup == null) {
+					throw new IllegalStateException("Cannot use a co-location constraint without a slot sharing group");
+				}
+
+				Tuple2<SlotSharingGroup, CoLocationGroup> constraint = coLocationGroups.computeIfAbsent(
+						coLocationGroupKey, (k) -> new Tuple2<>(sharingGroup, new CoLocationGroup()));
+
+				if (constraint.f0 != sharingGroup) {
+					throw new IllegalStateException("Cannot co-locate operators from different slot sharing groups");
+				}
 
-			SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup);
-			if (group == null) {
-				group = new SlotSharingGroup();
-				slotSharingGroups.put(slotSharingGroup, group);
+				vertex.updateCoLocationGroup(constraint.f1);
 			}
-			entry.getValue().setSlotSharingGroup(group);
 		}
 
 		for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 1f763bb..bfbe78d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -151,6 +153,9 @@ public abstract class StreamTransformation<T> {
 
 	private String slotSharingGroup;
 
+	@Nullable
+	private String coLocationGroupKey;
+
 	/**
 	 * Creates a new {@code StreamTransformation} with the given name, output type and parallelism.
 	 *
@@ -345,6 +350,35 @@ public abstract class StreamTransformation<T> {
 	}
 
 	/**
+	 * <b>NOTE:</b> This is an internal undocumented feature for now. It is not
+	 * clear whether this will be supported and stable in the long term.
+	 *
+	 * <p>Sets the key that identifies the co-location group.
+	 * Operators with the same co-location key will have their corresponding subtasks
+	 * placed into the same slot by the scheduler.
+	 *
+	 * <p>Setting this to null means there is no co-location constraint.
+	 */
+	public void setCoLocationGroupKey(@Nullable String coLocationGroupKey) {
+		this.coLocationGroupKey = coLocationGroupKey;
+	}
+
+	/**
+	 * <b>NOTE:</b> This is an internal undocumented feature for now. It is not
+	 * clear whether this will be supported and stable in the long term.
+	 *
+	 * <p>Gets the key that identifies the co-location group.
+	 * Operators with the same co-location key will have their corresponding subtasks
+	 * placed into the same slot by the scheduler.
+	 *
+	 * <p>If this is null (which is the default), it means there is no co-location constraint.
+	 */
+	@Nullable
+	public String getCoLocationGroupKey() {
+		return coLocationGroupKey;
+	}
+
+	/**
 	 * Tries to fill in the type information. Type information can be filled in
 	 * later when the program uses a type hint. This method checks whether the
 	 * type information has ever been accessed before and does not allow

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
new file mode 100644
index 0000000..a225681
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.graph;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Test that check the hidden API to set co location constraints on the
+ * stream transformations.
+ */
+public class StreamGraphCoLocationConstraintTest {
+
+	@Test
+	public void testSettingCoLocationConstraint() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(7);
+
+		// set up the test program
+		DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+		source.getTransformation().setCoLocationGroupKey("group1");
+
+		DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v);
+		step1.getTransformation().setCoLocationGroupKey("group2");
+
+		DataStream<Long> step2 = step1.keyBy(v -> v).map(v -> v);
+		step2.getTransformation().setCoLocationGroupKey("group1");
+
+		DataStreamSink<Long> result = step2.keyBy(v -> v).addSink(new DiscardingSink<>());
+		result.getTransformation().setCoLocationGroupKey("group2");
+
+		// get the graph
+		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+		assertEquals(4, jobGraph.getNumberOfVertices());
+
+		List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+		for (JobVertex vertex : vertices) {
+			assertNotNull(vertex.getCoLocationGroup());
+		}
+
+		assertEquals(vertices.get(0).getCoLocationGroup(), vertices.get(2).getCoLocationGroup());
+		assertEquals(vertices.get(1).getCoLocationGroup(), vertices.get(3).getCoLocationGroup());
+	}
+
+	@Test
+	public void testCoLocateDifferenSharingGroups() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(7);
+
+		// set up the test program
+		DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+		source.getTransformation().setSlotSharingGroup("ssg1");
+		source.getTransformation().setCoLocationGroupKey("co1");
+
+		DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v);
+		step1.getTransformation().setSlotSharingGroup("ssg2");
+		step1.getTransformation().setCoLocationGroupKey("co2");
+
+		DataStream<Long> step2 = step1.keyBy(v -> v).map(v -> v);
+		step2.getTransformation().setSlotSharingGroup("ssg3");
+		step2.getTransformation().setCoLocationGroupKey("co1");
+
+		DataStreamSink<Long> result = step2.keyBy(v -> v).addSink(new DiscardingSink<>());
+		result.getTransformation().setSlotSharingGroup("ssg4");
+		result.getTransformation().setCoLocationGroupKey("co2");
+
+		// get the graph
+		try {
+			env.getStreamGraph().getJobGraph();
+			fail("exception expected");
+		}
+		catch (IllegalStateException ignored) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 201e138..96eaa78 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -793,6 +793,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 					null,
 					null,
 					null,
+					null,
 					null
 				),
 				new StreamNode(
@@ -802,6 +803,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 					null,
 					null,
 					null,
+					null,
 					null
 				),
 				0,

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 74898a4..10e50ce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -76,8 +76,8 @@ public class StreamConfigChainer {
 
 		tailConfig.setChainedOutputs(Collections.singletonList(
 			new StreamEdge(
-				new StreamNode(null, tailConfig.getChainIndex(), null, null, null, null, null),
-				new StreamNode(null, chainIndex, null, null, null, null, null),
+				new StreamNode(null, tailConfig.getChainIndex(), null, null, null, null, null, null),
+				new StreamNode(null, chainIndex, null, null, null, null, null, null),
 				0,
 				Collections.<String>emptyList(),
 				null,
@@ -99,8 +99,8 @@ public class StreamConfigChainer {
 		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
 		outEdgesInOrder.add(
 			new StreamEdge(
-				new StreamNode(null, chainIndex, null, null, null, null, null),
-				new StreamNode(null, chainIndex , null, null, null, null, null),
+				new StreamNode(null, chainIndex, null, null, null, null, null, null),
+				new StreamNode(null, chainIndex , null, null, null, null, null, null),
 				0,
 				Collections.<String>emptyList(),
 				new BroadcastPartitioner<Object>(),

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 08032bd..b2f1b99 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -176,8 +176,8 @@ public class StreamTaskTestHarness<OUT> {
 		};
 
 		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
-		StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+		StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+		StreamNode targetVertexDummy = new StreamNode(null, 1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
 
 		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 78e6de2..4c1c424 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -116,8 +116,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
 			private static final long serialVersionUID = 1L;
 		};
 
-		StreamNode sourceVertexDummy = new StreamNode(null, 0, "default group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(null, 1, "default group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+		StreamNode sourceVertexDummy = new StreamNode(null, 0, "default group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+		StreamNode targetVertexDummy = new StreamNode(null, 1, "default group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
 
 		for (int i = 0; i < numInputGates; i++) {