You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 15:28:26 UTC

[flink] 01/02: [FLINK-19083] Remove deprecated DataStream#split

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a08548a209167cafeeba1ce602fe8d542994be5
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 4 11:37:47 2020 +0200

    [FLINK-19083] Remove deprecated DataStream#split
    
    This closes #13343
---
 .../examples/iteration/IterateExample.java         |  58 ++++----
 flink-streaming-java/pom.xml                       |   4 +
 .../collector/selector/CopyingDirectedOutput.java  |  67 ---------
 .../api/collector/selector/DirectedOutput.java     | 163 ---------------------
 .../api/collector/selector/OutputSelector.java     |  46 ------
 .../flink/streaming/api/datastream/DataStream.java |  23 +--
 .../streaming/api/datastream/IterativeStream.java  |   4 +-
 .../api/datastream/SingleOutputStreamOperator.java |  19 ---
 .../streaming/api/datastream/SplitStream.java      |  67 ---------
 .../flink/streaming/api/graph/StreamConfig.java    |  22 ---
 .../flink/streaming/api/graph/StreamEdge.java      |  22 +--
 .../flink/streaming/api/graph/StreamGraph.java     |  54 +------
 .../streaming/api/graph/StreamGraphGenerator.java  |  72 ---------
 .../flink/streaming/api/graph/StreamNode.java      |  15 +-
 .../api/graph/StreamingJobGraphGenerator.java      |   1 -
 .../api/transformations/SelectTransformation.java  |  82 -----------
 .../api/transformations/SplitTransformation.java   |  82 -----------
 .../streaming/runtime/tasks/OperatorChain.java     |  47 ++----
 .../apache/flink/streaming/api/DataStreamTest.java | 113 --------------
 .../api/collector/OutputSelectorTest.java          |  63 --------
 .../api/datastream/SplitSideOutputTest.java        |  78 ----------
 .../api/graph/StreamGraphGeneratorTest.java        | 115 +++------------
 .../operators/StreamOperatorChainingTest.java      |  26 ++--
 .../runtime/tasks/OneInputStreamTaskTest.java      |   3 -
 .../runtime/tasks/StreamConfigChainer.java         |  14 +-
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |   8 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |   7 +-
 .../tasks/TwoInputStreamTaskTestHarness.java       |   6 +-
 .../streaming/util/EvenOddOutputSelector.java      |  35 -----
 .../flink/streaming/util/MockStreamConfig.java     |   8 +-
 flink-streaming-scala/pom.xml                      |   5 +
 .../flink/streaming/api/scala/DataStream.scala     |  34 +----
 .../flink/streaming/api/scala/SplitStream.scala    |  40 -----
 .../apache/flink/streaming/api/scala/package.scala |   8 -
 .../flink/streaming/api/scala/DataStreamTest.scala |  29 ----
 .../scala/StreamingScalaAPICompletenessTest.scala  |  13 +-
 .../streaming/runtime/DirectedOutputITCase.java    |  93 ------------
 .../test/streaming/runtime/IterateITCase.java      |  46 ++++--
 .../streaming/runtime/OutputSplitterITCase.java    | 147 -------------------
 .../runtime/util/EvenOddOutputSelector.java        |  35 -----
 40 files changed, 151 insertions(+), 1623 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index ecf57ee..7f80d3b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -21,15 +21,15 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Random;
 
 /**
@@ -50,6 +50,9 @@ public class IterateExample {
 
 	private static final int BOUND = 100;
 
+	private static final OutputTag<Tuple5<Integer, Integer, Integer, Integer, Integer>> ITERATE_TAG =
+		new OutputTag<Tuple5<Integer, Integer, Integer, Integer, Integer>>("iterate") {};
+
 	// *************************************************************************
 	// PROGRAM
 	// *************************************************************************
@@ -84,19 +87,16 @@ public class IterateExample {
 				.iterate(5000L);
 
 		// apply the step function to get the next Fibonacci number
-		// increment the counter and split the output with the output selector
-		SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
-				.split(new MySelector());
+		// increment the counter and split the output
+		SingleOutputStreamOperator<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.process(new Step());
 
 		// close the iteration by selecting the tuples that were directed to the
 		// 'iterate' channel in the output selector
-		it.closeWith(step.select("iterate"));
+		it.closeWith(step.getSideOutput(ITERATE_TAG));
 
-		// to produce the final output select the tuples directed to the
-		// 'output' channel then get the input pairs that have the greatest iteration counter
+		// to produce the final get the input pairs that have the greatest iteration counter
 		// on a 1 second sliding window
-		DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
-				.map(new OutputMap());
+		DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.map(new OutputMap());
 
 		// emit results
 		if (params.has("output")) {
@@ -176,33 +176,27 @@ public class IterateExample {
 	/**
 	 * Iteration step function that calculates the next Fibonacci number.
 	 */
-	public static class Step implements
-			MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
-					Integer, Integer>> {
+	public static class Step
+		extends ProcessFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
-				Integer> value) throws Exception {
-			return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
-		}
-	}
-
-	/**
-	 * OutputSelector testing which tuple needs to be iterated again.
-	 */
-	public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
-		private static final long serialVersionUID = 1L;
+		public void processElement(
+			Tuple5<Integer, Integer, Integer, Integer, Integer> value,
+			Context ctx,
+			Collector<Tuple5<Integer, Integer, Integer, Integer, Integer>> out) throws Exception {
+			Tuple5<Integer, Integer, Integer, Integer, Integer> element = new Tuple5<>(
+				value.f0,
+				value.f1,
+				value.f3,
+				value.f2 + value.f3,
+				++value.f4);
 
-		@Override
-		public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
-			List<String> output = new ArrayList<>();
 			if (value.f2 < BOUND && value.f3 < BOUND) {
-				output.add("iterate");
+				ctx.output(ITERATE_TAG, element);
 			} else {
-				output.add("output");
+				out.collect(element);
 			}
-			return output;
 		}
 	}
 
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 5966de4..05ea145 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -113,6 +113,10 @@ under the License.
 							<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction)</exclude>
 							<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#apply(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.functions.windowing.WindowFunction)</exclude>
 							<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#apply(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.functions.windowing.WindowFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+
+							<!-- DataStream#split was removed in 1.12 -->
+							<exclude>org.apache.flink.streaming.api.datastream.DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)</exclude>
+							<exclude>org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)</exclude>
 						</excludes>
 					</parameter>
 				</configuration>
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
deleted file mode 100644
index 3533cd1..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * Special version of {@link DirectedOutput} that performs a shallow copy of the
- * {@link StreamRecord} to ensure that multi-chaining works correctly.
- */
-public class CopyingDirectedOutput<OUT> extends DirectedOutput<OUT> {
-
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public CopyingDirectedOutput(
-			List<OutputSelector<OUT>> outputSelectors,
-			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
-		super(outputSelectors, outputs);
-	}
-
-	@Override
-	public void collect(StreamRecord<OUT> record) {
-		Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
-
-		if (selectedOutputs.isEmpty()) {
-			return;
-		}
-
-		Iterator<Output<StreamRecord<OUT>>> it = selectedOutputs.iterator();
-
-		while (true) {
-			Output<StreamRecord<OUT>> out = it.next();
-			if (it.hasNext()) {
-				// we don't have the last output
-				// perform a shallow copy
-				StreamRecord<OUT> shallowCopy = record.copy(record.getValue());
-				out.collect(shallowCopy);
-			} else {
-				// this is the last output
-				out.collect(record);
-				break;
-			}
-		}
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
deleted file mode 100644
index 2adeaf4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.XORShiftRandom;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * Wrapping {@link Output} that forwards to other {@link Output Outputs } based on a list of
- * {@link OutputSelector OutputSelectors}.
- */
-public class DirectedOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
-
-	protected final OutputSelector<OUT>[] outputSelectors;
-
-	protected final Output<StreamRecord<OUT>>[] selectAllOutputs;
-
-	protected final HashMap<String, Output<StreamRecord<OUT>>[]> outputMap;
-
-	protected final Output<StreamRecord<OUT>>[] allOutputs;
-
-	private final Random random = new XORShiftRandom();
-
-	protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
-
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public DirectedOutput(
-			List<OutputSelector<OUT>> outputSelectors,
-			List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
-		this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]);
-
-		this.allOutputs = new Output[outputs.size()];
-		for (int i = 0; i < outputs.size(); i++) {
-			allOutputs[i] = outputs.get(i).f0;
-		}
-
-		HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new HashSet<Output<StreamRecord<OUT>>>();
-		HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap = new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>();
-
-		for (Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs) {
-			final Output<StreamRecord<OUT>> output = outputPair.f0;
-			final StreamEdge edge = outputPair.f1;
-
-			List<String> selectedNames = edge.getSelectedNames();
-
-			if (selectedNames.isEmpty()) {
-				selectAllOutputs.add(output);
-			}
-			else {
-				for (String selectedName : selectedNames) {
-					if (!outputMap.containsKey(selectedName)) {
-						outputMap.put(selectedName, new ArrayList<Output<StreamRecord<OUT>>>());
-						outputMap.get(selectedName).add(output);
-					}
-					else {
-						if (!outputMap.get(selectedName).contains(output)) {
-							outputMap.get(selectedName).add(output);
-						}
-					}
-				}
-			}
-		}
-
-		this.selectAllOutputs = selectAllOutputs.toArray(new Output[selectAllOutputs.size()]);
-
-		this.outputMap = new HashMap<>();
-		for (Map.Entry<String, ArrayList<Output<StreamRecord<OUT>>>> entry : outputMap.entrySet()) {
-			Output<StreamRecord<OUT>>[] arr = entry.getValue().toArray(new Output[entry.getValue().size()]);
-			this.outputMap.put(entry.getKey(), arr);
-		}
-	}
-
-	@Override
-	public void emitWatermark(Watermark mark) {
-		watermarkGauge.setCurrentWatermark(mark.getTimestamp());
-		for (Output<StreamRecord<OUT>> out : allOutputs) {
-			out.emitWatermark(mark);
-		}
-	}
-
-	@Override
-	public void emitLatencyMarker(LatencyMarker latencyMarker) {
-		// randomly select an output
-		allOutputs[random.nextInt(allOutputs.length)].emitLatencyMarker(latencyMarker);
-	}
-
-	protected Set<Output<StreamRecord<OUT>>> selectOutputs(StreamRecord<OUT> record)  {
-		Set<Output<StreamRecord<OUT>>> selectedOutputs = new HashSet<>(selectAllOutputs.length);
-		Collections.addAll(selectedOutputs, selectAllOutputs);
-
-		for (OutputSelector<OUT> outputSelector : outputSelectors) {
-			Iterable<String> outputNames = outputSelector.select(record.getValue());
-
-			for (String outputName : outputNames) {
-				Output<StreamRecord<OUT>>[] outputList = outputMap.get(outputName);
-				if (outputList != null) {
-					Collections.addAll(selectedOutputs, outputList);
-				}
-			}
-		}
-
-		return selectedOutputs;
-	}
-
-	@Override
-	public void collect(StreamRecord<OUT> record) {
-		Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
-
-		for (Output<StreamRecord<OUT>> out : selectedOutputs) {
-			out.collect(record);
-		}
-	}
-
-	@Override
-	public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
-		throw new UnsupportedOperationException("Cannot use split/select with side outputs.");
-	}
-
-	@Override
-	public void close() {
-		for (Output<StreamRecord<OUT>> out : allOutputs) {
-			out.close();
-		}
-	}
-
-	@Override
-	public Gauge<Long> getWatermarkGauge() {
-		return watermarkGauge;
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
deleted file mode 100644
index a8433fe..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-import java.io.Serializable;
-
-/**
- * Interface for defining an OutputSelector for a {@link SplitStream} using
- * the {@link SingleOutputStreamOperator#split} call. Every output object of a
- * {@link SplitStream} will run through this operator to select outputs.
- *
- * @param <OUT>
- *            Type parameter of the split values.
- */
-@PublicEvolving
-public interface OutputSelector<OUT> extends Serializable {
-	/**
-	 * Method for selecting output names for the emitted objects when using the
-	 * {@link SingleOutputStreamOperator#split} method. The values will be
-	 * emitted only to output names which are contained in the returned
-	 * iterable.
-	 *
-	 * @param value
-	 *            Output object for which the output selection should be made.
-	 */
-	Iterable<String> select(OUT value);
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5a54ea2..8198450 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -51,7 +51,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -97,6 +96,7 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -229,23 +229,6 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Operator used for directing tuples to specific named outputs using an
-	 * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
-	 * Calling this method on an operator creates a new {@link SplitStream}.
-	 *
-	 * @param outputSelector
-	 *            The user defined
-	 *            {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
-	 *            for directing the tuples.
-	 * @return The {@link SplitStream}
-	 * @deprecated Please use side output instead.
-	 */
-	@Deprecated
-	public SplitStream<T> split(OutputSelector<T> outputSelector) {
-		return new SplitStream<>(this, clean(outputSelector));
-	}
-
-	/**
 	 * Creates a new {@link ConnectedStreams} by connecting
 	 * {@link DataStream} outputs of (possible) different types with each other.
 	 * The DataStreams connected using this operator can be used with
@@ -526,7 +509,7 @@ public class DataStream<T> {
 	 *
 	 * <p>A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link #split(OutputSelector)} for more information.
+	 * {@link ProcessFunction.Context#output(OutputTag, Object)} for more information.
 	 *
 	 * <p>The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head unless it is changed in the
@@ -558,7 +541,7 @@ public class DataStream<T> {
 	 *
 	 * <p>A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link #split(OutputSelector)} for more information.
+	 * {@link ProcessFunction.Context#output(OutputTag, Object)} for more information.
 	 *
 	 * <p>The iteration edge will be partitioned the same way as the first input of
 	 * the iteration head unless it is changed in the
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index 430241a..228ec3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -23,8 +23,10 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
 import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 
@@ -54,7 +56,7 @@ public class IterativeStream<T> extends SingleOutputStreamOperator<T> {
 	 *
 	 * <p>A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
+	 * {@link ProcessFunction.Context#output(OutputTag, Object)}
 	 * for more information.
 	 *
 	 * @param feedbackStream
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 6a70803..9f0dc04 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
@@ -57,8 +56,6 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	 */
 	private Map<OutputTag<?>, TypeInformation<?>> requestedSideOutputs = new HashMap<>();
 
-	private boolean wasSplitApplied = false;
-
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, Transformation<T> transformation) {
 		super(environment, transformation);
 	}
@@ -379,17 +376,6 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 		return this;
 	}
 
-	@Override
-	public SplitStream<T> split(OutputSelector<T> outputSelector) {
-		if (requestedSideOutputs.isEmpty()) {
-			wasSplitApplied = true;
-			return super.split(outputSelector);
-		} else {
-			throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
-				"As a work-around, please add a no-op map function before the split() call.");
-		}
-	}
-
 	/**
 	 * Gets the {@link DataStream} that contains the elements that are emitted from an operation
 	 * into the side output with the given {@link OutputTag}.
@@ -397,11 +383,6 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	 * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
 	 */
 	public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
-		if (wasSplitApplied) {
-			throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
-				"As a work-around, please add a no-op map function before the split() call.");
-		}
-
 		sideOutputTag = clean(requireNonNull(sideOutputTag));
 
 		// make a defensive copy
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
deleted file mode 100644
index 7f28dc7..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-/**
- * The SplitStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply transformation on the whole output simply
- * call the transformation on the SplitStream
- *
- * @param <OUT> The type of the elements in the Stream
- */
-
-@Deprecated
-@PublicEvolving
-public class SplitStream<OUT> extends DataStream<OUT> {
-
-	protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
-		super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
-	}
-
-	/**
-	 * Sets the output names for which the next operator will receive values.
-	 *
-	 * @param outputNames
-	 *            The output names for which the operator will receive the
-	 *            input.
-	 * @return Returns the selected DataStream
-	 */
-	public DataStream<OUT> select(String... outputNames) {
-		return selectOutput(outputNames);
-	}
-
-	private DataStream<OUT> selectOutput(String[] outputNames) {
-		for (String outName : outputNames) {
-			if (outName == null) {
-				throw new RuntimeException("Selected names must not be null");
-			}
-		}
-
-		SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
-		return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
-	}
-
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0a23523..3b76227 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -41,7 +40,6 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -74,7 +72,6 @@ public class StreamConfig implements Serializable {
 	private static final String CHAIN_INDEX = "chainIndex";
 	private static final String VERTEX_NAME = "vertexID";
 	private static final String ITERATION_ID = "iterationId";
-	private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
 	private static final String INPUTS = "inputs";
 	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
 	private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
@@ -278,25 +275,6 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	public void setOutputSelectors(List<OutputSelector<?>> outputSelectors) {
-		try {
-			InstantiationUtil.writeObjectToConfig(outputSelectors, this.config, OUTPUT_SELECTOR_WRAPPER);
-		} catch (IOException e) {
-			throw new StreamTaskException("Could not serialize output selectors", e);
-		}
-	}
-
-	public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader userCodeClassloader) {
-		try {
-			List<OutputSelector<T>> selectors =
-					InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, userCodeClassloader);
-			return selectors == null ? Collections.<OutputSelector<T>>emptyList() : selectors;
-
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not read output selectors", e);
-		}
-	}
-
 	public void setIterationId(String iterationId) {
 		config.setString(ITERATION_ID, iterationId);
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 70232ce..c4f32c5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.OutputTag;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -50,13 +49,6 @@ public class StreamEdge implements Serializable {
 	 * The type number of the input for co-tasks.
 	 */
 	private final int typeNumber;
-
-	/**
-	 * A list of output names that the target vertex listens to (if there is
-	 * output selection).
-	 */
-	private final List<String> selectedNames;
-
 	/**
 	 * The side-output tag (if any) of this {@link StreamEdge}.
 	 */
@@ -85,7 +77,6 @@ public class StreamEdge implements Serializable {
 		StreamNode sourceVertex,
 		StreamNode targetVertex,
 		int typeNumber,
-		List<String> selectedNames,
 		StreamPartitioner<?> outputPartitioner,
 		OutputTag outputTag) {
 
@@ -94,7 +85,6 @@ public class StreamEdge implements Serializable {
 			targetVertex,
 			typeNumber,
 			ALWAYS_FLUSH_BUFFER_TIMEOUT,
-			selectedNames,
 			outputPartitioner,
 			outputTag,
 			ShuffleMode.UNDEFINED);
@@ -104,7 +94,6 @@ public class StreamEdge implements Serializable {
 		StreamNode sourceVertex,
 		StreamNode targetVertex,
 		int typeNumber,
-		List<String> selectedNames,
 		StreamPartitioner<?> outputPartitioner,
 		OutputTag outputTag,
 		ShuffleMode shuffleMode) {
@@ -114,7 +103,6 @@ public class StreamEdge implements Serializable {
 			targetVertex,
 			typeNumber,
 			sourceVertex.getBufferTimeout(),
-			selectedNames,
 			outputPartitioner,
 			outputTag,
 			shuffleMode);
@@ -125,7 +113,6 @@ public class StreamEdge implements Serializable {
 		StreamNode targetVertex,
 		int typeNumber,
 		long bufferTimeout,
-		List<String> selectedNames,
 		StreamPartitioner<?> outputPartitioner,
 		OutputTag outputTag,
 		ShuffleMode shuffleMode) {
@@ -134,13 +121,12 @@ public class StreamEdge implements Serializable {
 		this.targetId = targetVertex.getId();
 		this.typeNumber = typeNumber;
 		this.bufferTimeout = bufferTimeout;
-		this.selectedNames = selectedNames;
 		this.outputPartitioner = outputPartitioner;
 		this.outputTag = outputTag;
 		this.sourceOperatorName = sourceVertex.getOperatorName();
 		this.targetOperatorName = targetVertex.getOperatorName();
 		this.shuffleMode = checkNotNull(shuffleMode);
-		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames + "_" + outputPartitioner;
+		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber  + "_" + outputPartitioner;
 	}
 
 	public int getSourceId() {
@@ -155,10 +141,6 @@ public class StreamEdge implements Serializable {
 		return typeNumber;
 	}
 
-	public List<String> getSelectedNames() {
-		return selectedNames;
-	}
-
 	public OutputTag getOutputTag() {
 		return this.outputTag;
 	}
@@ -206,7 +188,7 @@ public class StreamEdge implements Serializable {
 	@Override
 	public String toString() {
 		return "(" + (sourceOperatorName + "-" + sourceId) + " -> " + (targetOperatorName + "-" + targetId)
-			+ ", typeNumber=" + typeNumber + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+			+ ", typeNumber=" + typeNumber + ", outputPartitioner=" + outputPartitioner
 			+ ", bufferTimeout=" + bufferTimeout + ", outputTag=" + outputTag + ')';
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7df5c28..c978435 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -106,7 +105,6 @@ public class StreamGraph implements Pipeline {
 	private Map<Integer, StreamNode> streamNodes;
 	private Set<Integer> sources;
 	private Set<Integer> sinks;
-	private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
 	private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
 	private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, ShuffleMode>> virtualPartitionNodes;
 
@@ -129,7 +127,6 @@ public class StreamGraph implements Pipeline {
 	 */
 	public void clear() {
 		streamNodes = new HashMap<>();
-		virtualSelectNodes = new HashMap<>();
 		virtualSideOutputNodes = new HashMap<>();
 		virtualPartitionNodes = new HashMap<>();
 		vertexIDtoBrokerID = new HashMap<>();
@@ -392,7 +389,6 @@ public class StreamGraph implements Pipeline {
 				coLocationGroup,
 				operatorFactory,
 				operatorName,
-				new ArrayList<OutputSelector<?>>(),
 				vertexClass);
 
 		streamNodes.put(vertexID, vertex);
@@ -401,27 +397,6 @@ public class StreamGraph implements Pipeline {
 	}
 
 	/**
-	 * Adds a new virtual node that is used to connect a downstream vertex to only the outputs
-	 * with the selected names.
-	 *
-	 * <p>When adding an edge from the virtual node to a downstream node the connection will be made
-	 * to the original node, only with the selected names given here.
-	 *
-	 * @param originalId ID of the node that should be connected to.
-	 * @param virtualId ID of the virtual node.
-	 * @param selectedNames The selected names.
-	 */
-	public void addVirtualSelectNode(Integer originalId, Integer virtualId, List<String> selectedNames) {
-
-		if (virtualSelectNodes.containsKey(virtualId)) {
-			throw new IllegalStateException("Already has virtual select node with id " + virtualId);
-		}
-
-		virtualSelectNodes.put(virtualId,
-				new Tuple2<Integer, List<String>>(originalId, selectedNames));
-	}
-
-	/**
 	 * Adds a new virtual node that is used to connect a downstream vertex to only the outputs with
 	 * the selected side-output {@link OutputTag}.
 	 *
@@ -488,9 +463,6 @@ public class StreamGraph implements Pipeline {
 		if (virtualSideOutputNodes.containsKey(id)) {
 			Integer mappedId = virtualSideOutputNodes.get(id).f0;
 			return getSlotSharingGroup(mappedId);
-		} else if (virtualSelectNodes.containsKey(id)) {
-			Integer mappedId = virtualSelectNodes.get(id).f0;
-			return getSlotSharingGroup(mappedId);
 		} else if (virtualPartitionNodes.containsKey(id)) {
 			Integer mappedId = virtualPartitionNodes.get(id).f0;
 			return getSlotSharingGroup(mappedId);
@@ -526,14 +498,6 @@ public class StreamGraph implements Pipeline {
 				outputTag = virtualSideOutputNodes.get(virtualId).f1;
 			}
 			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
-		} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
-			int virtualId = upStreamVertexID;
-			upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
-			if (outputNames.isEmpty()) {
-				// selections that happen downstream override earlier selections
-				outputNames = virtualSelectNodes.get(virtualId).f1;
-			}
-			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
 		} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
 			int virtualId = upStreamVertexID;
 			upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
@@ -567,28 +531,14 @@ public class StreamGraph implements Pipeline {
 				shuffleMode = ShuffleMode.UNDEFINED;
 			}
 
-			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
+			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
+				partitioner, outputTag, shuffleMode);
 
 			getStreamNode(edge.getSourceId()).addOutEdge(edge);
 			getStreamNode(edge.getTargetId()).addInEdge(edge);
 		}
 	}
 
-	public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
-		if (virtualPartitionNodes.containsKey(vertexID)) {
-			addOutputSelector(virtualPartitionNodes.get(vertexID).f0, outputSelector);
-		} else if (virtualSelectNodes.containsKey(vertexID)) {
-			addOutputSelector(virtualSelectNodes.get(vertexID).f0, outputSelector);
-		} else {
-			getStreamNode(vertexID).addOutputSelector(outputSelector);
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Outputselector set for {}", vertexID);
-			}
-		}
-
-	}
-
 	public void setParallelism(Integer vertexID, int parallelism) {
 		if (getStreamNode(vertexID) != null) {
 			getStreamNode(vertexID).setParallelism(parallelism);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 0f6c0e7..d689c99 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -41,11 +41,9 @@ import org.apache.flink.streaming.api.transformations.LegacySourceTransformation
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
 import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
 import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
@@ -260,10 +258,6 @@ public class StreamGraphGenerator {
 			transformedIds = transformSink((SinkTransformation<?>) transform);
 		} else if (transform instanceof UnionTransformation<?>) {
 			transformedIds = transformUnion((UnionTransformation<?>) transform);
-		} else if (transform instanceof SplitTransformation<?>) {
-			transformedIds = transformSplit((SplitTransformation<?>) transform);
-		} else if (transform instanceof SelectTransformation<?>) {
-			transformedIds = transformSelect((SelectTransformation<?>) transform);
 		} else if (transform instanceof FeedbackTransformation<?>) {
 			transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
 		} else if (transform instanceof CoFeedbackTransformation<?>) {
@@ -352,56 +346,6 @@ public class StreamGraphGenerator {
 	}
 
 	/**
-	 * Transforms a {@code SplitTransformation}.
-	 *
-	 * <p>We add the output selector to previously transformed nodes.
-	 */
-	private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
-
-		Transformation<T> input = split.getInput();
-		Collection<Integer> resultIds = transform(input);
-
-		validateSplitTransformation(input);
-
-		// the recursive transform call might have transformed this already
-		if (alreadyTransformed.containsKey(split)) {
-			return alreadyTransformed.get(split);
-		}
-
-		for (int inputId : resultIds) {
-			streamGraph.addOutputSelector(inputId, split.getOutputSelector());
-		}
-
-		return resultIds;
-	}
-
-	/**
-	 * Transforms a {@code SelectTransformation}.
-	 *
-	 * <p>For this we create a virtual node in the {@code StreamGraph} holds the selected names.
-	 *
-	 * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
-	 */
-	private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
-		Transformation<T> input = select.getInput();
-		Collection<Integer> resultIds = transform(input);
-
-		// the recursive transform might have already transformed this
-		if (alreadyTransformed.containsKey(select)) {
-			return alreadyTransformed.get(select);
-		}
-
-		List<Integer> virtualResultIds = new ArrayList<>();
-
-		for (int inputId : resultIds) {
-			int virtualId = Transformation.getNewNodeId();
-			streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
-			virtualResultIds.add(virtualId);
-		}
-		return virtualResultIds;
-	}
-
-	/**
 	 * Transforms a {@code SideOutputTransformation}.
 	 *
 	 * <p>For this we create a virtual node in the {@code StreamGraph} that holds the side-output
@@ -839,20 +783,4 @@ public class StreamGraphGenerator {
 			return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup;
 		}
 	}
-
-	private <T> void validateSplitTransformation(Transformation<T> input) {
-		if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
-			throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-		} else if (input instanceof SideOutputTransformation) {
-			throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
-		} else if (input instanceof UnionTransformation) {
-			for (Transformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
-				validateSplitTransformation(transformation);
-			}
-		} else if (input instanceof PartitionTransformation) {
-			validateSplitTransformation(((PartitionTransformation) input).getInput());
-		} else {
-			return;
-		}
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 8f25d6c..f36d95e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -69,7 +68,6 @@ public class StreamNode implements Serializable {
 	private TypeSerializer<?> stateKeySerializer;
 
 	private transient StreamOperatorFactory<?> operatorFactory;
-	private List<OutputSelector<?>> outputSelectors;
 	private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
 	private TypeSerializer<?> typeSerializerOut;
 
@@ -91,10 +89,9 @@ public class StreamNode implements Serializable {
 			@Nullable String coLocationGroup,
 			StreamOperator<?> operator,
 			String operatorName,
-			List<OutputSelector<?>> outputSelector,
 			Class<? extends AbstractInvokable> jobVertexClass) {
 		this(id, slotSharingGroup, coLocationGroup, SimpleOperatorFactory.of(operator),
-				operatorName, outputSelector, jobVertexClass);
+				operatorName, jobVertexClass);
 	}
 
 	public StreamNode(
@@ -103,12 +100,10 @@ public class StreamNode implements Serializable {
 			@Nullable String coLocationGroup,
 			StreamOperatorFactory<?> operatorFactory,
 			String operatorName,
-			List<OutputSelector<?>> outputSelector,
 			Class<? extends AbstractInvokable> jobVertexClass) {
 		this.id = id;
 		this.operatorName = operatorName;
 		this.operatorFactory = operatorFactory;
-		this.outputSelectors = outputSelector;
 		this.jobVertexClass = jobVertexClass;
 		this.slotSharingGroup = slotSharingGroup;
 		this.coLocationGroup = coLocationGroup;
@@ -230,14 +225,6 @@ public class StreamNode implements Serializable {
 		return operatorName;
 	}
 
-	public List<OutputSelector<?>> getOutputSelectors() {
-		return outputSelectors;
-	}
-
-	public void addOutputSelector(OutputSelector<?> outputSelector) {
-		this.outputSelectors.add(outputSelector);
-	}
-
 	public void setSerializersIn(TypeSerializer<?> ...typeSerializersIn) {
 		checkArgument(typeSerializersIn.length > 0);
 		this.typeSerializersIn = typeSerializersIn;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 31eafd6..758f479 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -496,7 +496,6 @@ public class StreamingJobGraphGenerator {
 		}
 
 		config.setStreamOperatorFactory(vertex.getOperatorFactory());
-		config.setOutputSelectors(vertex.getOutputSelectors());
 
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setNonChainedOutputs(nonChainableOutputs);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
deleted file mode 100644
index 7343700..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.transformations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Transformation;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a selection of only certain upstream elements. This must
- * follow a {@link org.apache.flink.streaming.api.transformations.SplitTransformation} that
- * splits elements into several logical streams with assigned names.
- *
- * <p>This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code SelectTransformation}
- */
-@Internal
-public class SelectTransformation<T> extends Transformation<T> {
-
-	private final Transformation<T> input;
-	private final List<String> selectedNames;
-
-	/**
-	 * Creates a new {@code SelectionTransformation} from the given input that only selects
-	 * the streams with the selected names.
-	 *
-	 * @param input The input {@code Transformation}
-	 * @param selectedNames The names from the upstream {@code SplitTransformation} that this
-	 *                      {@code SelectTransformation} selects.
-	 */
-	public SelectTransformation(
-		Transformation<T> input,
-			List<String> selectedNames) {
-		super("Select", input.getOutputType(), input.getParallelism());
-		this.input = input;
-		this.selectedNames = selectedNames;
-	}
-
-	/**
-	 * Returns the input {@code Transformation}.
-	 */
-	public Transformation<T> getInput() {
-		return input;
-	}
-
-	/**
-	 * Returns the names of the split streams that this {@code SelectTransformation} selects.
-	 */
-	public List<String> getSelectedNames() {
-		return selectedNames;
-	}
-
-	@Override
-	public Collection<Transformation<?>> getTransitivePredecessors() {
-		List<Transformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input.getTransitivePredecessors());
-		return result;
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
deleted file mode 100644
index 2ff312d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.transformations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a split of one
- * {@link org.apache.flink.streaming.api.datastream.DataStream} into several {@code DataStreams}
- * using an {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
- *
- * <p>This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code SplitTransformation}
- */
-@Internal
-public class SplitTransformation<T> extends Transformation<T> {
-
-	private final Transformation<T> input;
-
-	private final OutputSelector<T> outputSelector;
-
-	/**
-	 * Creates a new {@code SplitTransformation} from the given input and {@code OutputSelector}.
-	 *
-	 * @param input The input {@code Transformation}
-	 * @param outputSelector The output selector
-	 */
-	public SplitTransformation(
-		Transformation<T> input,
-			OutputSelector<T> outputSelector) {
-		super("Split", input.getOutputType(), input.getParallelism());
-		this.input = input;
-		this.outputSelector = outputSelector;
-	}
-
-	/**
-	 * Returns the input {@code Transformation}.
-	 */
-	public Transformation<T> getInput() {
-		return input;
-	}
-
-	/**
-	 * Returns the {@code OutputSelector}.
-	 */
-	public OutputSelector<T> getOutputSelector() {
-		return outputSelector;
-	}
-
-	@Override
-	public Collection<Transformation<?>> getTransitivePredecessors() {
-		List<Transformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		result.addAll(input.getTransitivePredecessors());
-		return result;
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 6efb5dd..ebfa615 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -31,9 +31,6 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
-import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.SourceInputConfig;
@@ -516,47 +513,25 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			allOutputs.add(new Tuple2<>(output, outputEdge));
 		}
 
-		// if there are multiple outputs, or the outputs are directed, we need to
-		// wrap them as one output
-
-		List<OutputSelector<T>> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
-
-		if (selectors == null || selectors.isEmpty()) {
-			// simple path, no selector necessary
-			if (allOutputs.size() == 1) {
-				return allOutputs.get(0).f0;
-			}
-			else {
-				// send to N outputs. Note that this includes the special case
-				// of sending to zero outputs
-				@SuppressWarnings({"unchecked", "rawtypes"})
-				Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
-				for (int i = 0; i < allOutputs.size(); i++) {
-					asArray[i] = allOutputs.get(i).f0;
-				}
-
-				// This is the inverse of creating the normal ChainingOutput.
-				// If the chaining output does not copy we need to copy in the broadcast output,
-				// otherwise multi-chaining would not work correctly.
-				if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-					return new CopyingBroadcastingOutputCollector<>(asArray, this);
-				} else  {
-					return new BroadcastingOutputCollector<>(asArray, this);
-				}
+		if (allOutputs.size() == 1) {
+			return allOutputs.get(0).f0;
+		} else {
+			// send to N outputs. Note that this includes the special case
+			// of sending to zero outputs
+			@SuppressWarnings({"unchecked"})
+			Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
+			for (int i = 0; i < allOutputs.size(); i++) {
+				asArray[i] = allOutputs.get(i).f0;
 			}
-		}
-		else {
-			// selector present, more complex routing necessary
 
 			// This is the inverse of creating the normal ChainingOutput.
 			// If the chaining output does not copy we need to copy in the broadcast output,
 			// otherwise multi-chaining would not work correctly.
 			if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-				return new CopyingDirectedOutput<>(selectors, allOutputs);
+				return new CopyingBroadcastingOutputCollector<>(asArray, this);
 			} else {
-				return new DirectedOutput<>(selectors, allOutputs);
+				return new BroadcastingOutputCollector<>(asArray, this);
 			}
-
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 4ae3218..d11c07b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
 import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -50,7 +49,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -83,7 +81,6 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.core.StringStartsWith;
@@ -996,24 +993,6 @@ public class DataStreamTest extends TestLogger {
 			fail(e.getMessage());
 		}
 
-		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
-		SplitStream<Integer> split = unionFilter.split(outputSelector);
-		split.select("dummy").addSink(new DiscardingSink<Integer>());
-		List<OutputSelector<?>> outputSelectors = getStreamGraph(env).getStreamNode(unionFilter.getId()).getOutputSelectors();
-		assertEquals(1, outputSelectors.size());
-		assertEquals(outputSelector, outputSelectors.get(0));
-
-		DataStream<Integer> select = split.select("a");
-		DataStreamSink<Integer> sink = select.print();
-
-		StreamEdge splitEdge = getStreamGraph(env).getStreamEdges(unionFilter.getId(), sink.getTransformation().getId()).get(0);
-		assertEquals("a", splitEdge.getSelectedNames().get(0));
-
-		DataStreamSink<Integer> sinkWithIdentifier = select.print("identifier");
-		StreamEdge newSplitEdge = getStreamGraph(env).getStreamEdges(unionFilter.getId(), sinkWithIdentifier.getTransformation().getId()).get(0);
-		assertEquals("a", newSplitEdge.getSelectedNames().get(0));
-
 		ConnectedStreams<Integer, Integer> connect = map.connect(flatMap);
 		CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() {
 			private static final long serialVersionUID = 1L;
@@ -1147,91 +1126,6 @@ public class DataStreamTest extends TestLogger {
 	}
 
 	/////////////////////////////////////////////////////////////
-	// Split testing
-	/////////////////////////////////////////////////////////////
-
-	@Test
-	public void testConsecutiveSplitRejection() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Integer> src = env.fromElements(0, 0);
-
-		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
-		src.split(outputSelector).split(outputSelector).addSink(new DiscardingSink<>());
-
-		expectedException.expect(IllegalStateException.class);
-		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-
-		getStreamGraph(env);
-	}
-
-	@Test
-	public void testSplitAfterSideOutputRejection() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Integer> src = env.fromElements(0, 0);
-
-		OutputTag<Integer> outputTag = new OutputTag<Integer>("dummy"){};
-		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
-		src.getSideOutput(outputTag).split(outputSelector).addSink(new DiscardingSink<>());
-
-		expectedException.expect(IllegalStateException.class);
-		expectedException.expectMessage("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
-
-		getStreamGraph(env);
-	}
-
-	@Test
-	public void testSelectBetweenConsecutiveSplitRejection() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Integer> src = env.fromElements(0, 0);
-
-		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
-		src.split(outputSelector).select("dummy").split(outputSelector).addSink(new DiscardingSink<>());
-
-		expectedException.expect(IllegalStateException.class);
-		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-
-		getStreamGraph(env);
-	}
-
-	@Test
-	public void testUnionBetweenConsecutiveSplitRejection() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Integer> src = env.fromElements(0, 0);
-
-		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
-		src.split(outputSelector).select("dummy").union(src.map(x -> x)).split(outputSelector).addSink(new DiscardingSink<>());
-
-		expectedException.expect(IllegalStateException.class);
-		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-
-		getStreamGraph(env);
-	}
-
-	@Test
-	public void testKeybyBetweenConsecutiveSplitRejection() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStreamSource<Integer> src = env.fromElements(0, 0);
-
-		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
-		src.split(outputSelector).select("dummy").keyBy(x -> x).split(outputSelector).addSink(new DiscardingSink<>());
-
-		expectedException.expect(IllegalStateException.class);
-		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-
-		getStreamGraph(env);
-	}
-
-	/////////////////////////////////////////////////////////////
 	// KeyBy testing
 	/////////////////////////////////////////////////////////////
 
@@ -1592,11 +1486,4 @@ public class DataStreamTest extends TestLogger {
 	private enum TestEnum {
 		FOO, BAR
 	}
-
-	private class DummyOutputSelector<Integer> implements OutputSelector<Integer> {
-		@Override
-		public Iterable<String> select(Integer value) {
-			return null;
-		}
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
deleted file mode 100644
index a2f8ed6..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link OutputSelector}.
- */
-public class OutputSelectorTest {
-
-	static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Iterable<String> select(Tuple1<Integer> tuple) {
-
-			String[] outputs = new String[tuple.f0];
-
-			for (Integer i = 0; i < tuple.f0; i++) {
-				outputs[i] = i.toString();
-			}
-			return Arrays.asList(outputs);
-		}
-	}
-
-	@Test
-	public void testGetOutputs() {
-		OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
-		List<String> expectedOutputs = new ArrayList<String>();
-		expectedOutputs.add("0");
-		expectedOutputs.add("1");
-		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2)));
-		expectedOutputs.add("2");
-		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3)));
-	}
-
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
deleted file mode 100644
index 8f33e19..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-
-/**
- * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}.
- */
-public class SplitSideOutputTest {
-
-	private static final OutputTag<String> outputTag = new OutputTag<String>("outputTag") {};
-
-	@Test
-	public void testSideOutputAfterSelectIsForbidden() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SingleOutputStreamOperator<String> processInput = env.fromElements("foo")
-			.process(new DummyProcessFunction());
-
-		processInput.split(Collections::singleton);
-
-		try {
-			processInput.getSideOutput(outputTag);
-			Assert.fail("Should have failed early with an exception.");
-		} catch (UnsupportedOperationException expected){
-			// expected
-		}
-	}
-
-	@Test
-	public void testSelectAfterSideOutputIsForbidden() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SingleOutputStreamOperator<String> processInput = env.fromElements("foo")
-			.process(new DummyProcessFunction());
-
-		processInput.getSideOutput(outputTag);
-
-		try {
-			processInput.split(Collections::singleton);
-			Assert.fail("Should have failed early with an exception.");
-		} catch (UnsupportedOperationException expected){
-			// expected
-		}
-	}
-
-	private static final class DummyProcessFunction extends ProcessFunction<String, String> {
-
-		@Override
-		public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
-		}
-	}
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 3d0f502..9124c49 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -54,7 +54,6 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.util.EvenOddOutputSelector;
 import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.util.TestLogger;
 
@@ -150,50 +149,38 @@ public class StreamGraphGeneratorTest extends TestLogger {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Integer> source = env.fromElements(1, 10);
-
 		DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
 
 		// verify that only the partitioning that was set last is used
 		DataStream<Integer> broadcastMap = rebalanceMap
-				.forward()
-				.global()
-				.broadcast()
-				.map(new NoOpIntMap());
+			.forward()
+			.global()
+			.broadcast()
+			.map(new NoOpIntMap());
 
 		broadcastMap.addSink(new DiscardingSink<>());
 
-		// verify that partitioning is preserved across union and split/select
-		EvenOddOutputSelector selector1 = new EvenOddOutputSelector();
-		EvenOddOutputSelector selector2 = new EvenOddOutputSelector();
-		EvenOddOutputSelector selector3 = new EvenOddOutputSelector();
-
-		DataStream<Integer> map1Operator = rebalanceMap
-				.map(new NoOpIntMap());
+		DataStream<Integer> broadcastOperator = rebalanceMap
+			.map(new NoOpIntMap())
+			.name("broadcast");
 
-		DataStream<Integer> map1 = map1Operator
-				.broadcast()
-				.split(selector1)
-				.select("even");
+		DataStream<Integer> map1 = broadcastOperator.broadcast();
 
-		DataStream<Integer> map2Operator = rebalanceMap
-				.map(new NoOpIntMap());
+		DataStream<Integer> globalOperator = rebalanceMap
+			.map(new NoOpIntMap())
+			.name("global");
 
-		DataStream<Integer> map2 = map2Operator
-				.split(selector2)
-				.select("odd")
-				.global();
+		DataStream<Integer> map2 = globalOperator.global();
 
-		DataStream<Integer> map3Operator = rebalanceMap
-				.map(new NoOpIntMap());
+		DataStream<Integer> shuffleOperator = rebalanceMap
+			.map(new NoOpIntMap())
+			.name("shuffle");
 
-		DataStream<Integer> map3 = map3Operator
-				.global()
-				.split(selector3)
-				.select("even")
-				.shuffle();
+		DataStream<Integer> map3 = shuffleOperator.shuffle();
 
 		SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3)
-				.map(new NoOpIntMap());
+			.map(new NoOpIntMap())
+			.name("union");
 
 		unionedMap.addSink(new DiscardingSink<>());
 
@@ -206,68 +193,10 @@ public class StreamGraphGeneratorTest extends TestLogger {
 		assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
 		assertEquals(rebalanceMap.getId(), graph.getSourceVertex(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0)).getId());
 
-		// verify that partitioning in unions is preserved and that it works across split/select
-		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
-		assertTrue(graph.getStreamNode(map1Operator.getId()).getOutputSelectors().contains(selector1));
-
-		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
-		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("odd"));
-		assertTrue(graph.getStreamNode(map2Operator.getId()).getOutputSelectors().contains(selector2));
-
-		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
-		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
-		assertTrue(graph.getStreamNode(map3Operator.getId()).getOutputSelectors().contains(selector3));
-	}
-
-	/**
-	 * This tests whether virtual Transformations behave correctly.
-	 *
-	 * <p>Checks whether output selector, partitioning works correctly when applied on a union.
-	 */
-	@Test
-	public void testVirtualTransformations2() throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Integer> source = env.fromElements(1, 10);
-
-		DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
-
-		DataStream<Integer> map1 = rebalanceMap
-				.map(new NoOpIntMap());
-
-		DataStream<Integer> map2 = rebalanceMap
-				.map(new NoOpIntMap());
-
-		DataStream<Integer> map3 = rebalanceMap
-				.map(new NoOpIntMap());
-
-		EvenOddOutputSelector selector = new EvenOddOutputSelector();
-
-		SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3)
-				.broadcast()
-				.split(selector)
-				.select("foo")
-				.map(new NoOpIntMap());
-
-		unionedMap.addSink(new DiscardingSink<>());
-
-		StreamGraph graph = env.getStreamGraph();
-
-		// verify that the properties are correctly set on all input operators
-		assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
-		assertTrue(graph.getStreamNode(map1.getId()).getOutputSelectors().contains(selector));
-
-		assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
-		assertTrue(graph.getStreamNode(map2.getId()).getOutputSelectors().contains(selector));
-
-		assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
-		assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
-		assertTrue(graph.getStreamNode(map3.getId()).getOutputSelectors().contains(selector));
-
+		// verify that partitioning in unions is preserved
+		assertTrue(graph.getStreamNode(broadcastOperator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+		assertTrue(graph.getStreamNode(globalOperator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
+		assertTrue(graph.getStreamNode(shuffleOperator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index b908144..d9d5174 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -39,12 +39,13 @@ import org.apache.flink.streaming.runtime.tasks.OperatorChain;
 import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.MockStreamTaskBuilder;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -190,20 +191,25 @@ public class StreamOperatorChainingTest {
 		input = input
 				.map(value -> value);
 
-		SplitStream<Integer> split = input.split(new OutputSelector<Integer>() {
+		OutputTag<Integer> oneOutput = new OutputTag<Integer>("one") {};
+		OutputTag<Integer> otherOutput = new OutputTag<Integer>("other") {};
+		SingleOutputStreamOperator<Object> split = input.process(new ProcessFunction<Integer, Object>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public Iterable<String> select(Integer value) {
+			public void processElement(
+					Integer value,
+					Context ctx, Collector<Object> out) throws Exception {
 				if (value.equals(1)) {
-					return Collections.singletonList("one");
+					ctx.output(oneOutput, value);
 				} else {
-					return Collections.singletonList("other");
+					ctx.output(otherOutput, value);
 				}
+
 			}
 		});
 
-		split.select("one")
+		split.getSideOutput(oneOutput)
 				.map(value -> "First 1: " + value)
 				.addSink(new SinkFunction<String>() {
 
@@ -213,7 +219,7 @@ public class StreamOperatorChainingTest {
 					}
 				});
 
-		split.select("one")
+		split.getSideOutput(oneOutput)
 				.map(value -> "First 2: " + value)
 				.addSink(new SinkFunction<String>() {
 
@@ -223,7 +229,7 @@ public class StreamOperatorChainingTest {
 					}
 				});
 
-		split.select("other")
+		split.getSideOutput(otherOutput)
 				.map(value -> "Second: " + value)
 				.addSink(new SinkFunction<String>() {
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 9ed30d4..5182651 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -856,7 +856,6 @@ public class OneInputStreamTaskTest extends TestLogger {
 					null,
 					(StreamOperator<?>) null,
 					null,
-					null,
 					null
 				),
 				new StreamNode(
@@ -865,11 +864,9 @@ public class OneInputStreamTaskTest extends TestLogger {
 					null,
 					(StreamOperator<?>) null,
 					null,
-					null,
 					null
 				),
 				0,
-				Collections.emptyList(),
 				null,
 				null
 			);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 6164163..5197dc4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -126,10 +126,9 @@ public class StreamConfigChainer<OWNER> {
 
 		tailConfig.setChainedOutputs(Collections.singletonList(
 			new StreamEdge(
-				new StreamNode(tailConfig.getChainIndex(), null, null, (StreamOperator<?>) null, null, null, null),
-				new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null),
+				new StreamNode(tailConfig.getChainIndex(), null, null, (StreamOperator<?>) null, null, null),
+				new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null),
 				0,
-				Collections.<String>emptyList(),
 				null,
 				null)));
 		tailConfig = new StreamConfig(new Configuration());
@@ -153,15 +152,13 @@ public class StreamConfigChainer<OWNER> {
 		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
 		outEdgesInOrder.add(
 			new StreamEdge(
-				new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null),
-				new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null, null),
+				new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null),
+				new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null),
 				0,
-				Collections.<String>emptyList(),
 				new BroadcastPartitioner<Object>(),
 				null));
 
 		tailConfig.setChainEnd();
-		tailConfig.setOutputSelectors(Collections.emptyList());
 		tailConfig.setNumberOfOutputs(1);
 		tailConfig.setOutEdgesInOrder(outEdgesInOrder);
 		tailConfig.setNonChainedOutputs(outEdgesInOrder);
@@ -186,7 +183,6 @@ public class StreamConfigChainer<OWNER> {
 			null,
 			dummyOperator,
 			"source dummy",
-			new LinkedList<>(),
 			SourceStreamTask.class);
 		StreamNode targetVertexDummy = new StreamNode(
 			MAIN_NODE_ID + 1,
@@ -194,14 +190,12 @@ public class StreamConfigChainer<OWNER> {
 			null,
 			dummyOperator,
 			"target dummy",
-			new LinkedList<>(),
 			SourceStreamTask.class);
 
 		outEdgesInOrder.add(new StreamEdge(
 			sourceVertexDummy,
 			targetVertexDummy,
 			0,
-			new LinkedList<>(),
 			new BroadcastPartitioner<>(),
 			null));
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 0ed1a41..af4308a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -165,7 +165,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 		inputGates = new StreamTestSingleInputGate[inputChannelsPerGate.size()];
 		List<StreamEdge> inPhysicalEdges = new LinkedList<>();
 
-		StreamNode mainNode = new StreamNode(StreamConfigChainer.MAIN_NODE_ID, null, null, (StreamOperator<?>) null, null, null, null);
+		StreamNode mainNode = new StreamNode(StreamConfigChainer.MAIN_NODE_ID, null, null, (StreamOperator<?>) null, null, null);
 		for (int i = 0; i < inputs.size(); i++) {
 			if ((inputs.get(i) instanceof NetworkInputConfig)) {
 				NetworkInputConfig networkInput = (NetworkInputConfig) inputs.get(i);
@@ -199,12 +199,11 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 			inputSerializer,
 			bufferSize);
 
-		StreamNode sourceVertexDummy = new StreamNode(0, null, null, (StreamOperator<?>) null, null, null, SourceStreamTask.class);
+		StreamNode sourceVertexDummy = new StreamNode(0, null, null, (StreamOperator<?>) null, null, SourceStreamTask.class);
 		StreamEdge streamEdge = new StreamEdge(
 			sourceVertexDummy,
 			targetVertexDummy,
 			gateIndex + 1,
-			new LinkedList<>(),
 			new BroadcastPartitioner<>(),
 			null);
 
@@ -222,10 +221,9 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 		List<StreamEdge> outEdgesInOrder = new LinkedList<>();
 
 		StreamEdge sourceToMainEdge = new StreamEdge(
-			new StreamNode(maxNodeId + inputId + 1337, null, null, (StreamOperator<?>) null, null, null, null),
+			new StreamNode(maxNodeId + inputId + 1337, null, null, (StreamOperator<?>) null, null, null),
 			mainNode,
 			0,
-			new LinkedList<>(),
 			new ForwardPartitioner<>(),
 			null);
 		outEdgesInOrder.add(sourceToMainEdge);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index e0ed885..175a244 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -202,7 +202,6 @@ public class StreamTaskTestHarness<OUT> {
 		setupCalled = true;
 		streamConfig.setChainStart();
 		streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
-		streamConfig.setOutputSelectors(Collections.emptyList());
 		streamConfig.setNumberOfOutputs(1);
 		streamConfig.setTypeSerializerOut(outputSerializer);
 		streamConfig.setVertexID(0);
@@ -213,10 +212,10 @@ public class StreamTaskTestHarness<OUT> {
 		};
 
 		List<StreamEdge> outEdgesInOrder = new LinkedList<>();
-		StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
+		StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", SourceStreamTask.class);
+		StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", SourceStreamTask.class);
 
-		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<>(), new BroadcastPartitioner<>(), null /* output tag */));
+		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new BroadcastPartitioner<>(), null /* output tag */));
 
 		streamConfig.setOutEdgesInOrder(outEdgesInOrder);
 		streamConfig.setNonChainedOutputs(outEdgesInOrder);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 719f16f..df1bc59 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -110,8 +110,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
 			private static final long serialVersionUID = 1L;
 		};
 
-		StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
+		StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", SourceStreamTask.class);
+		StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", SourceStreamTask.class);
 
 		for (int i = 0; i < numInputGates; i++) {
 
@@ -126,7 +126,6 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
 					StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
 							targetVertexDummy,
 							1,
-							new LinkedList<>(),
 							new BroadcastPartitioner<>(),
 							null /* output tag */);
 
@@ -143,7 +142,6 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
 					StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
 							targetVertexDummy,
 							2,
-							new LinkedList<>(),
 							new BroadcastPartitioner<>(),
 							null /* output tag */);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
deleted file mode 100644
index 26da5d3..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import java.util.Arrays;
-
-/**
- * Tests for {@link OutputSelector}.
- */
-public class EvenOddOutputSelector implements OutputSelector<Integer> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Iterable<String> select(Integer value) {
-		return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
-	}
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
index ec31592..bbfa4bb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -41,7 +40,6 @@ public class MockStreamConfig extends StreamConfig {
 		super(configuration);
 
 		setChainStart();
-		setOutputSelectors(Collections.emptyList());
 		setNumberOfOutputs(numberOfOutputs);
 		setTypeSerializerOut(new StringSerializer());
 		setVertexID(0);
@@ -52,13 +50,13 @@ public class MockStreamConfig extends StreamConfig {
 			private static final long serialVersionUID = 1L;
 		};
 
-		StreamNode sourceVertex = new StreamNode(0, null, null, dummyOperator, "source", new ArrayList<>(), SourceStreamTask.class);
-		StreamNode targetVertex = new StreamNode(1, null, null, dummyOperator, "target", new ArrayList<>(), SourceStreamTask.class);
+		StreamNode sourceVertex = new StreamNode(0, null, null, dummyOperator, "source", SourceStreamTask.class);
+		StreamNode targetVertex = new StreamNode(1, null, null, dummyOperator, "target", SourceStreamTask.class);
 
 		List<StreamEdge> outEdgesInOrder = new ArrayList<>(numberOfOutputs);
 		for (int i = 0; i < numberOfOutputs; i++) {
 			outEdgesInOrder.add(
-				new StreamEdge(sourceVertex, targetVertex, numberOfOutputs, new ArrayList<>(), new BroadcastPartitioner<>(), null));
+				new StreamEdge(sourceVertex, targetVertex, numberOfOutputs, new BroadcastPartitioner<>(), null));
 		}
 		setOutEdgesInOrder(outEdgesInOrder);
 		setNonChainedOutputs(outEdgesInOrder);
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index c6295cc..9200cef 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -278,6 +278,11 @@ under the License.
 							<exclude>org.apache.flink.streaming.api.scala.WindowedStream#fold(java.lang.Object,scala.Function2,scala.Function4,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
 							<exclude>org.apache.flink.streaming.api.scala.AllWindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
 							<exclude>org.apache.flink.streaming.api.scala.WindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+
+							<!-- DataStream#split was removed in 1.12 -->
+							<exclude>org.apache.flink.streaming.api.scala.DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)</exclude>
+							<exclude>org.apache.flink.streaming.api.scala.DataStream#split(scala.Function1)</exclude>
+							<exclude>org.apache.flink.streaming.api.scala.SplitStream</exclude>
 						</excludes>
 					</parameter>
 				</configuration>
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0608c53..7bd08d4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -32,7 +32,6 @@ import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
 import org.apache.flink.core.fs.{FileSystem, Path}
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
@@ -559,7 +558,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * stepfunction: initialStream => (feedback, output)
    *
    * A common pattern is to use output splitting to create feedback and output DataStream.
-   * Please refer to the [[split]] method of the DataStream
+   * Please see the side outputs of [[ProcessFunction]] method of the DataStream
    *
    * By default a DataStream with iteration will never terminate, but the user
    * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
@@ -898,37 +897,6 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
-   *
-   * Operator used for directing tuples to specific named outputs using an
-   * OutputSelector. Calling this method on an operator creates a new
-   * [[SplitStream]].
-   *
-   * @deprecated Please use side output instead.
-   */
-  @deprecated
-  def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))
-
-  /**
-   * Creates a new [[SplitStream]] that contains only the elements satisfying the
-   *  given output selector predicate.
-   *
-   * @deprecated Please use side output instead.
-   */
-  @deprecated
-  def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("OutputSelector must not be null.")
-    }
-    val cleanFun = clean(fun)
-    val selector = new OutputSelector[T] {
-      def select(in: T): java.lang.Iterable[String] = {
-        cleanFun(in).toIterable.asJava
-      }
-    }
-    split(selector)
-  }
-
-  /**
    * Creates a co-group operation. See [[CoGroupedStreams]] for an example of how the keys
    * and window can be specified.
    */
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
deleted file mode 100644
index 16a5a48..0000000
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.annotation.Public
-import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
-
-/**
- * The SplitStream represents an operator that has been split using an
- * [[org.apache.flink.streaming.api.collector.selector.OutputSelector]].
- * Named outputs can be selected using the [[SplitStream#select()]] function.
- * To apply a transformation on the whole output simply call
- * the appropriate method on this stream.
- */
-@deprecated("Please use side outputs instead of split/select", "deprecated since 1.8.2")
-@Public
-class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){
-
-  /**
-   *  Sets the output names for which the next operator will receive values.
-   */
-  def select(outputNames: String*): DataStream[T] = 
-    asScalaStream(javaStream.select(outputNames: _*))
-}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index ef96fd1..ffc3d09 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -17,12 +17,10 @@
  */
 
 package org.apache.flink.streaming.api
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
 import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
 import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
 import org.apache.flink.streaming.api.datastream.{ BroadcastConnectedStream => BroadcastConnectedJavaStreams }
 import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
@@ -51,12 +49,6 @@ package object scala {
                                              = new KeyedStream[R, K](stream)
 
   /**
-   * Converts an [[org.apache.flink.streaming.api.datastream.SplitStream]] to a
-   * [[org.apache.flink.streaming.api.scala.SplitStream]].
-   */
-  private[flink] def asScalaStream[R](stream: SplitJavaStream[R])
-                                             = new SplitStream[R](stream)
-  /**
    * Converts an [[org.apache.flink.streaming.api.datastream.ConnectedStreams]] to a
    * [[org.apache.flink.streaming.api.scala.ConnectedStreams]].
    */
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 3d7cbb2..d90dd25 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.ParallelIteratorInputFormat
 import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
@@ -572,34 +571,6 @@ class DataStreamTest extends AbstractTestBase {
       }
     }
 
-    val outputSelector = new OutputSelector[Int] {
-      override def select(value: Int): lang.Iterable[String] = null
-    }
-
-    val split = unionFilter.split(outputSelector)
-    split.print()
-    val outputSelectors = getStreamGraph(env).getStreamNode(unionFilter.getId).getOutputSelectors
-    assert(1 == outputSelectors.size)
-    assert(outputSelector == outputSelectors.get(0))
-
-    unionFilter.split(x => List("a")).print()
-    val moreOutputSelectors = getStreamGraph(env)
-      .getStreamNode(unionFilter.getId)
-      .getOutputSelectors
-    assert(2 == moreOutputSelectors.size)
-
-    val select = split.select("a")
-    val sink = select.print()
-    val splitEdge =
-      getStreamGraph(env).getStreamEdgesOrThrow(unionFilter.getId, sink.getTransformation.getId)
-    assert("a" == splitEdge.get(0).getSelectedNames.get(0))
-
-    val sinkWithIdentifier = select.print("identifier")
-    val newSplitEdge = getStreamGraph(env).getStreamEdgesOrThrow(
-      unionFilter.getId,
-      sinkWithIdentifier.getTransformation.getId)
-    assert("a" == newSplitEdge.get(0).getSelectedNames.get(0))
-
     val connect = map.connect(flatMap)
 
     val coMapFunction =
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index d8737e1..0903653 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -17,15 +17,15 @@
  */
 package org.apache.flink.streaming.api.scala
 
-import java.lang.reflect.Method
-
 import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
 
-import scala.language.existentials
-
 import org.junit.Test
 
+import java.lang.reflect.Method
+
+import scala.language.existentials
+
 /**
  * This checks whether the streaming Scala API is up to feature parity with the Java API.
  */
@@ -116,11 +116,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       classOf[ConnectedStreams[_,_]])
 
     checkMethods(
-      "SplitStream", "SplitStream",
-      classOf[org.apache.flink.streaming.api.datastream.SplitStream[_]],
-      classOf[SplitStream[_]])
-
-    checkMethods(
       "WindowedStream", "WindowedStream",
       classOf[org.apache.flink.streaming.api.datastream.WindowedStream[_, _, _]],
       classOf[WindowedStream[_, _, _]])
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java
deleted file mode 100644
index c50e708..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.streaming.runtime;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Integration tests for a streaming {@link OutputSelector}.
- */
-public class DirectedOutputITCase extends AbstractTestBase {
-
-	private static final String TEN = "ten";
-	private static final String ODD = "odd";
-	private static final String EVEN = "even";
-	private static final String NON_SELECTED = "nonSelected";
-
-	static final class MyOutputSelector implements OutputSelector<Long> {
-		private static final long serialVersionUID = 1L;
-
-		List<String> outputs = new ArrayList<String>();
-
-		@Override
-		public Iterable<String> select(Long value) {
-			outputs.clear();
-			if (value % 2 == 0) {
-				outputs.add(EVEN);
-			} else {
-				outputs.add(ODD);
-			}
-
-			if (value == 10L) {
-				outputs.add(TEN);
-			}
-
-			if (value == 11L) {
-				outputs.add(NON_SELECTED);
-			}
-			return outputs;
-		}
-	}
-
-	@Test
-	public void outputSelectorTest() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
-		TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();
-		TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
-		TestListResultSink<Long> allSink = new TestListResultSink<Long>();
-
-		SplitStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
-		source.select(EVEN).addSink(evenSink);
-		source.select(ODD, TEN).addSink(oddAndTenSink);
-		source.select(EVEN, ODD).addSink(evenAndOddSink);
-		source.addSink(allSink);
-
-		env.execute();
-		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult());
-		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
-				evenAndOddSink.getSortedResult());
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
-				allSink.getSortedResult());
-	}
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index fbe6616..bc9980f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -32,8 +32,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -44,12 +44,12 @@ import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.test.streaming.runtime.util.EvenOddOutputSelector;
 import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
 import org.apache.flink.test.streaming.runtime.util.ReceiveCheckNoOpSink;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.OutputTag;
 
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -223,11 +223,25 @@ public class IterateITCase extends AbstractTestBase {
 		DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
 		DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
 
-		SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+		OutputTag<Integer> even = new OutputTag<Integer>("even") {};
+		OutputTag<Integer> odd = new OutputTag<Integer>("odd") {};
+		SingleOutputStreamOperator<Object> source3 = env.fromElements(1, 2, 3, 4, 5)
 				.map(noOpIntMap).name("EvenOddSourceMap")
-				.split(new EvenOddOutputSelector());
+				.process(new ProcessFunction<Integer, Object>() {
+					@Override
+					public void processElement(
+						Integer value,
+						Context ctx,
+						Collector<Object> out) throws Exception {
+						if (value % 2 == 0) {
+							ctx.output(even, value);
+						} else {
+							ctx.output(odd, value);
+						}
+					}
+				});
 
-		iter1.closeWith(source3.select("even").union(
+		iter1.closeWith(source3.getSideOutput(even).union(
 				head1.rebalance().map(noOpIntMap).broadcast(), head2.shuffle()));
 
 		StreamGraph graph = env.getStreamGraph();
@@ -263,7 +277,6 @@ public class IterateITCase extends AbstractTestBase {
 
 			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("EvenOddSourceMap")) {
 				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
-				assertTrue(edge.getSelectedNames().contains("even"));
 			}
 		}
 
@@ -307,13 +320,27 @@ public class IterateITCase extends AbstractTestBase {
 				.addSink(new ReceiveCheckNoOpSink<Integer>());
 		DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
 
-		SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+		OutputTag<Integer> even = new OutputTag<Integer>("even") {};
+		OutputTag<Integer> odd = new OutputTag<Integer>("odd") {};
+		SingleOutputStreamOperator<Object> source3 = env.fromElements(1, 2, 3, 4, 5)
 				.map(noOpIntMap)
 				.name("split")
-				.split(new EvenOddOutputSelector());
+				.process(new ProcessFunction<Integer, Object>() {
+					@Override
+					public void processElement(
+							Integer value,
+							Context ctx,
+							Collector<Object> out) throws Exception {
+						if (value % 2 == 0) {
+							ctx.output(even, value);
+						} else {
+							ctx.output(odd, value);
+						}
+					}
+				});
 
 		iter1.closeWith(
-				source3.select("even").union(
+				source3.getSideOutput(even).union(
 						head1.map(noOpIntMap).name("bc").broadcast(),
 						head2.map(noOpIntMap).shuffle()));
 
@@ -345,7 +372,6 @@ public class IterateITCase extends AbstractTestBase {
 			String tailName = graph.getSourceVertex(edge).getOperatorName();
 			if (tailName.equals("split")) {
 				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
-				assertTrue(edge.getSelectedNames().contains("even"));
 			} else if (tailName.equals("bc")) {
 				assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
 			} else if (tailName.equals("shuffle")) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java
deleted file mode 100644
index 0a509b9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.streaming.runtime;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Integration tests for a streaming {@link OutputSelector}.
- */
-public class OutputSplitterITCase extends AbstractTestBase {
-
-	private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testOnMergedDataStream() throws Exception {
-		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
-		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-		env.setBufferTimeout(1);
-
-		DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
-		DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
-
-		d1 = d1.union(d2);
-
-		d1.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = 8354166915727490130L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value > 4) {
-					s.add(">");
-				} else {
-					s.add("<");
-				}
-				return s;
-			}
-		}).select(">").addSink(splitterResultSink1);
-
-		d1.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = -6822487543355994807L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 3 == 0) {
-					s.add("yes");
-				} else {
-					s.add("no");
-				}
-				return s;
-			}
-		}).select("yes").addSink(splitterResultSink2);
-		env.execute();
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9));
-		assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9));
-		assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
-	}
-
-	@Test
-	public void testOnSingleDataStream() throws Exception {
-		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
-		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-		env.setBufferTimeout(1);
-
-		DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
-		ds.split(new OutputSelector<Integer>() {
-			private static final long serialVersionUID = 2524335410904414121L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 2 == 0) {
-					s.add("even");
-				} else {
-					s.add("odd");
-				}
-				return s;
-			}
-		}).select("even").addSink(splitterResultSink1);
-
-		ds.split(new OutputSelector<Integer>() {
-
-			private static final long serialVersionUID = -511693919586034092L;
-
-			@Override
-			public Iterable<String> select(Integer value) {
-				List<String> s = new ArrayList<String>();
-				if (value % 4 == 0) {
-					s.add("yes");
-				} else {
-					s.add("no");
-				}
-				return s;
-			}
-		}).select("yes").addSink(splitterResultSink2);
-		env.execute();
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8));
-		assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
-
-		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0, 4, 8));
-		assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
-	}
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
deleted file mode 100644
index 804c19b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.streaming.runtime.util;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import java.util.Collections;
-
-/**
- * {@link OutputSelector} mapping integers to "even" and "odd" streams.
- */
-public class EvenOddOutputSelector implements OutputSelector<Integer> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Iterable<String> select(Integer value) {
-		return value % 2 == 0 ? Collections.singleton("even") : Collections.singleton("odd");
-	}
-}