You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/12 07:44:44 UTC
flink git commit: [FLINK-9809] [DataSteam API] Allow setting
co-location constraints on StreamTransformations.
Repository: flink
Updated Branches:
refs/heads/master 3c4e59a7f -> 3f0b9fee5
[FLINK-9809] [DataSteam API] Allow setting co-location constraints on StreamTransformations.
This feature is currently only exposed on StreamTransformations (internal API) rather
than in the public API, because it is a hidden expert feature.
This closes #6309
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f0b9fee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f0b9fee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f0b9fee
Branch: refs/heads/master
Commit: 3f0b9fee5185fcc7200179459bed31fb4bd08bbf
Parents: 3c4e59a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 11 17:48:10 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 12 09:38:43 2018 +0200
----------------------------------------------------------------------
.../flink/streaming/api/graph/StreamGraph.java | 22 ++--
.../api/graph/StreamGraphGenerator.java | 7 +-
.../flink/streaming/api/graph/StreamNode.java | 14 +++
.../api/graph/StreamingJobGraphGenerator.java | 43 ++++++--
.../transformations/StreamTransformation.java | 34 +++++++
.../StreamGraphCoLocationConstraintTest.java | 102 +++++++++++++++++++
.../runtime/tasks/OneInputStreamTaskTest.java | 2 +
.../runtime/tasks/StreamConfigChainer.java | 8 +-
.../runtime/tasks/StreamTaskTestHarness.java | 4 +-
.../tasks/TwoInputStreamTaskTestHarness.java | 4 +-
10 files changed, 215 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index e5ed0c2..01768ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -53,6 +53,8 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -163,38 +165,41 @@ public class StreamGraph extends StreamingPlan {
public <IN, OUT> void addSource(Integer vertexID,
String slotSharingGroup,
+ @Nullable String coLocationGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
- addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
+ addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
sources.add(vertexID);
}
public <IN, OUT> void addSink(Integer vertexID,
String slotSharingGroup,
+ @Nullable String coLocationGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
- addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
+ addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
sinks.add(vertexID);
}
public <IN, OUT> void addOperator(
Integer vertexID,
String slotSharingGroup,
+ @Nullable String coLocationGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
if (operatorObject instanceof StoppableStreamSource) {
- addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
+ addNode(vertexID, slotSharingGroup, coLocationGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
} else if (operatorObject instanceof StreamSource) {
- addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
+ addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorObject, operatorName);
} else {
- addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
+ addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorObject, operatorName);
}
TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
@@ -223,13 +228,14 @@ public class StreamGraph extends StreamingPlan {
public <IN1, IN2, OUT> void addCoOperator(
Integer vertexID,
String slotSharingGroup,
+ @Nullable String coLocationGroup,
TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject,
TypeInformation<IN1> in1TypeInfo,
TypeInformation<IN2> in2TypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
- addNode(vertexID, slotSharingGroup, TwoInputStreamTask.class, taskOperatorObject, operatorName);
+ addNode(vertexID, slotSharingGroup, coLocationGroup, TwoInputStreamTask.class, taskOperatorObject, operatorName);
TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
outTypeInfo.createSerializer(executionConfig) : null;
@@ -250,6 +256,7 @@ public class StreamGraph extends StreamingPlan {
protected StreamNode addNode(Integer vertexID,
String slotSharingGroup,
+ @Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperator<?> operatorObject,
String operatorName) {
@@ -261,6 +268,7 @@ public class StreamGraph extends StreamingPlan {
StreamNode vertex = new StreamNode(environment,
vertexID,
slotSharingGroup,
+ coLocationGroup,
operatorObject,
operatorName,
new ArrayList<OutputSelector<?>>(),
@@ -593,6 +601,7 @@ public class StreamGraph extends StreamingPlan {
ResourceSpec preferredResources) {
StreamNode source = this.addNode(sourceId,
null,
+ null,
StreamIterationHead.class,
null,
"IterationSource-" + loopId);
@@ -603,6 +612,7 @@ public class StreamGraph extends StreamingPlan {
StreamNode sink = this.addNode(sinkId,
null,
+ null,
StreamIterationTail.class,
null,
"IterationSink-" + loopId);
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 11a7002..2c4ae4a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -466,9 +466,11 @@ public class StreamGraphGenerator {
* Transforms a {@code SourceTransformation}.
*/
private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
- String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), new ArrayList<Integer>());
+ String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
+
streamGraph.addSource(source.getId(),
slotSharingGroup,
+ source.getCoLocationGroupKey(),
source.getOperator(),
null,
source.getOutputType(),
@@ -493,6 +495,7 @@ public class StreamGraphGenerator {
streamGraph.addSink(sink.getId(),
slotSharingGroup,
+ sink.getCoLocationGroupKey(),
sink.getOperator(),
sink.getInput().getOutputType(),
null,
@@ -535,6 +538,7 @@ public class StreamGraphGenerator {
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
+ transform.getCoLocationGroupKey(),
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
@@ -580,6 +584,7 @@ public class StreamGraphGenerator {
streamGraph.addCoOperator(
transform.getId(),
slotSharingGroup,
+ transform.getCoLocationGroupKey(),
transform.getOperator(),
transform.getInputType1(),
transform.getInputType2(),
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 78ab877..fe12662 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -54,6 +56,7 @@ public class StreamNode implements Serializable {
private Long bufferTimeout = null;
private final String operatorName;
private String slotSharingGroup;
+ private @Nullable String coLocationGroup;
private KeySelector<?, ?> statePartitioner1;
private KeySelector<?, ?> statePartitioner2;
private TypeSerializer<?> stateKeySerializer;
@@ -77,10 +80,12 @@ public class StreamNode implements Serializable {
public StreamNode(StreamExecutionEnvironment env,
Integer id,
String slotSharingGroup,
+ @Nullable String coLocationGroup,
StreamOperator<?> operator,
String operatorName,
List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
+
this.env = env;
this.id = id;
this.operatorName = operatorName;
@@ -88,6 +93,7 @@ public class StreamNode implements Serializable {
this.outputSelectors = outputSelector;
this.jobVertexClass = jobVertexClass;
this.slotSharingGroup = slotSharingGroup;
+ this.coLocationGroup = coLocationGroup;
}
public void addInEdge(StreamEdge inEdge) {
@@ -253,6 +259,14 @@ public class StreamNode implements Serializable {
return slotSharingGroup;
}
+ public void setCoLocationGroup(@Nullable String coLocationGroup) {
+ this.coLocationGroup = coLocationGroup;
+ }
+
+ public @Nullable String getCoLocationGroup() {
+ return coLocationGroup;
+ }
+
public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {
return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null) ||
(slotSharingGroup != null && slotSharingGroup.equals(downstreamVertex.slotSharingGroup));
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5b8254d..603b9e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -152,7 +152,7 @@ public class StreamingJobGraphGenerator {
setPhysicalEdges();
- setSlotSharing();
+ setSlotSharingAndCoLocation();
configureCheckpointing();
@@ -531,20 +531,43 @@ public class StreamingJobGraphGenerator {
&& streamGraph.isChainingEnabled();
}
- private void setSlotSharing() {
-
- Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
+ private void setSlotSharingAndCoLocation() {
+ final HashMap<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
+ final HashMap<String, Tuple2<SlotSharingGroup, CoLocationGroup>> coLocationGroups = new HashMap<>();
for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
- String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
+ final StreamNode node = streamGraph.getStreamNode(entry.getKey());
+ final JobVertex vertex = entry.getValue();
+
+ // configure slot sharing group
+ final String slotSharingGroupKey = node.getSlotSharingGroup();
+ final SlotSharingGroup sharingGroup;
+
+ if (slotSharingGroupKey != null) {
+ sharingGroup = slotSharingGroups.computeIfAbsent(
+ slotSharingGroupKey, (k) -> new SlotSharingGroup());
+ vertex.setSlotSharingGroup(sharingGroup);
+ } else {
+ sharingGroup = null;
+ }
+
+ // configure co-location constraint
+ final String coLocationGroupKey = node.getCoLocationGroup();
+ if (coLocationGroupKey != null) {
+ if (sharingGroup == null) {
+ throw new IllegalStateException("Cannot use a co-location constraint without a slot sharing group");
+ }
+
+ Tuple2<SlotSharingGroup, CoLocationGroup> constraint = coLocationGroups.computeIfAbsent(
+ coLocationGroupKey, (k) -> new Tuple2<>(sharingGroup, new CoLocationGroup()));
+
+ if (constraint.f0 != sharingGroup) {
+ throw new IllegalStateException("Cannot co-locate operators from different slot sharing groups");
+ }
- SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup);
- if (group == null) {
- group = new SlotSharingGroup();
- slotSharingGroups.put(slotSharingGroup, group);
+ vertex.updateCoLocationGroup(constraint.f1);
}
- entry.getValue().setSlotSharingGroup(group);
}
for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 1f763bb..bfbe78d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -151,6 +153,9 @@ public abstract class StreamTransformation<T> {
private String slotSharingGroup;
+ @Nullable
+ private String coLocationGroupKey;
+
/**
* Creates a new {@code StreamTransformation} with the given name, output type and parallelism.
*
@@ -345,6 +350,35 @@ public abstract class StreamTransformation<T> {
}
/**
+ * <b>NOTE:</b> This is an internal undocumented feature for now. It is not
+ * clear whether this will be supported and stable in the long term.
+ *
+ * <p>Sets the key that identifies the co-location group.
+ * Operators with the same co-location key will have their corresponding subtasks
+ * placed into the same slot by the scheduler.
+ *
+ * <p>Setting this to null means there is no co-location constraint.
+ */
+ public void setCoLocationGroupKey(@Nullable String coLocationGroupKey) {
+ this.coLocationGroupKey = coLocationGroupKey;
+ }
+
+ /**
+ * <b>NOTE:</b> This is an internal undocumented feature for now. It is not
+ * clear whether this will be supported and stable in the long term.
+ *
+ * <p>Gets the key that identifies the co-location group.
+ * Operators with the same co-location key will have their corresponding subtasks
+ * placed into the same slot by the scheduler.
+ *
+ * <p>If this is null (which is the default), it means there is no co-location constraint.
+ */
+ @Nullable
+ public String getCoLocationGroupKey() {
+ return coLocationGroupKey;
+ }
+
+ /**
* Tries to fill in the type information. Type information can be filled in
* later when the program uses a type hint. This method checks whether the
* type information has ever been accessed before and does not allow
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
new file mode 100644
index 0000000..a225681
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.graph;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Test that check the hidden API to set co location constraints on the
+ * stream transformations.
+ */
+public class StreamGraphCoLocationConstraintTest {
+
+ @Test
+ public void testSettingCoLocationConstraint() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(7);
+
+ // set up the test program
+ DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+ source.getTransformation().setCoLocationGroupKey("group1");
+
+ DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v);
+ step1.getTransformation().setCoLocationGroupKey("group2");
+
+ DataStream<Long> step2 = step1.keyBy(v -> v).map(v -> v);
+ step2.getTransformation().setCoLocationGroupKey("group1");
+
+ DataStreamSink<Long> result = step2.keyBy(v -> v).addSink(new DiscardingSink<>());
+ result.getTransformation().setCoLocationGroupKey("group2");
+
+ // get the graph
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ assertEquals(4, jobGraph.getNumberOfVertices());
+
+ List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+ for (JobVertex vertex : vertices) {
+ assertNotNull(vertex.getCoLocationGroup());
+ }
+
+ assertEquals(vertices.get(0).getCoLocationGroup(), vertices.get(2).getCoLocationGroup());
+ assertEquals(vertices.get(1).getCoLocationGroup(), vertices.get(3).getCoLocationGroup());
+ }
+
+ @Test
+ public void testCoLocateDifferenSharingGroups() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(7);
+
+ // set up the test program
+ DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+ source.getTransformation().setSlotSharingGroup("ssg1");
+ source.getTransformation().setCoLocationGroupKey("co1");
+
+ DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v);
+ step1.getTransformation().setSlotSharingGroup("ssg2");
+ step1.getTransformation().setCoLocationGroupKey("co2");
+
+ DataStream<Long> step2 = step1.keyBy(v -> v).map(v -> v);
+ step2.getTransformation().setSlotSharingGroup("ssg3");
+ step2.getTransformation().setCoLocationGroupKey("co1");
+
+ DataStreamSink<Long> result = step2.keyBy(v -> v).addSink(new DiscardingSink<>());
+ result.getTransformation().setSlotSharingGroup("ssg4");
+ result.getTransformation().setCoLocationGroupKey("co2");
+
+ // get the graph
+ try {
+ env.getStreamGraph().getJobGraph();
+ fail("exception expected");
+ }
+ catch (IllegalStateException ignored) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 201e138..96eaa78 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -793,6 +793,7 @@ public class OneInputStreamTaskTest extends TestLogger {
null,
null,
null,
+ null,
null
),
new StreamNode(
@@ -802,6 +803,7 @@ public class OneInputStreamTaskTest extends TestLogger {
null,
null,
null,
+ null,
null
),
0,
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 74898a4..10e50ce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -76,8 +76,8 @@ public class StreamConfigChainer {
tailConfig.setChainedOutputs(Collections.singletonList(
new StreamEdge(
- new StreamNode(null, tailConfig.getChainIndex(), null, null, null, null, null),
- new StreamNode(null, chainIndex, null, null, null, null, null),
+ new StreamNode(null, tailConfig.getChainIndex(), null, null, null, null, null, null),
+ new StreamNode(null, chainIndex, null, null, null, null, null, null),
0,
Collections.<String>emptyList(),
null,
@@ -99,8 +99,8 @@ public class StreamConfigChainer {
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
outEdgesInOrder.add(
new StreamEdge(
- new StreamNode(null, chainIndex, null, null, null, null, null),
- new StreamNode(null, chainIndex , null, null, null, null, null),
+ new StreamNode(null, chainIndex, null, null, null, null, null, null),
+ new StreamNode(null, chainIndex , null, null, null, null, null, null),
0,
Collections.<String>emptyList(),
new BroadcastPartitioner<Object>(),
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 08032bd..b2f1b99 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -176,8 +176,8 @@ public class StreamTaskTestHarness<OUT> {
};
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
- StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
- StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+ StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+ StreamNode targetVertexDummy = new StreamNode(null, 1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
http://git-wip-us.apache.org/repos/asf/flink/blob/3f0b9fee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 78e6de2..4c1c424 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -116,8 +116,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
private static final long serialVersionUID = 1L;
};
- StreamNode sourceVertexDummy = new StreamNode(null, 0, "default group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
- StreamNode targetVertexDummy = new StreamNode(null, 1, "default group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+ StreamNode sourceVertexDummy = new StreamNode(null, 0, "default group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+ StreamNode targetVertexDummy = new StreamNode(null, 1, "default group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
for (int i = 0; i < numInputGates; i++) {