You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 15:28:26 UTC
[flink] 01/02: [FLINK-19083] Remove deprecated DataStream#split
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a08548a209167cafeeba1ce602fe8d542994be5
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Sep 4 11:37:47 2020 +0200
[FLINK-19083] Remove deprecated DataStream#split
This closes #13343
---
.../examples/iteration/IterateExample.java | 58 ++++----
flink-streaming-java/pom.xml | 4 +
.../collector/selector/CopyingDirectedOutput.java | 67 ---------
.../api/collector/selector/DirectedOutput.java | 163 ---------------------
.../api/collector/selector/OutputSelector.java | 46 ------
.../flink/streaming/api/datastream/DataStream.java | 23 +--
.../streaming/api/datastream/IterativeStream.java | 4 +-
.../api/datastream/SingleOutputStreamOperator.java | 19 ---
.../streaming/api/datastream/SplitStream.java | 67 ---------
.../flink/streaming/api/graph/StreamConfig.java | 22 ---
.../flink/streaming/api/graph/StreamEdge.java | 22 +--
.../flink/streaming/api/graph/StreamGraph.java | 54 +------
.../streaming/api/graph/StreamGraphGenerator.java | 72 ---------
.../flink/streaming/api/graph/StreamNode.java | 15 +-
.../api/graph/StreamingJobGraphGenerator.java | 1 -
.../api/transformations/SelectTransformation.java | 82 -----------
.../api/transformations/SplitTransformation.java | 82 -----------
.../streaming/runtime/tasks/OperatorChain.java | 47 ++----
.../apache/flink/streaming/api/DataStreamTest.java | 113 --------------
.../api/collector/OutputSelectorTest.java | 63 --------
.../api/datastream/SplitSideOutputTest.java | 78 ----------
.../api/graph/StreamGraphGeneratorTest.java | 115 +++------------
.../operators/StreamOperatorChainingTest.java | 26 ++--
.../runtime/tasks/OneInputStreamTaskTest.java | 3 -
.../runtime/tasks/StreamConfigChainer.java | 14 +-
.../tasks/StreamTaskMailboxTestHarnessBuilder.java | 8 +-
.../runtime/tasks/StreamTaskTestHarness.java | 7 +-
.../tasks/TwoInputStreamTaskTestHarness.java | 6 +-
.../streaming/util/EvenOddOutputSelector.java | 35 -----
.../flink/streaming/util/MockStreamConfig.java | 8 +-
flink-streaming-scala/pom.xml | 5 +
.../flink/streaming/api/scala/DataStream.scala | 34 +----
.../flink/streaming/api/scala/SplitStream.scala | 40 -----
.../apache/flink/streaming/api/scala/package.scala | 8 -
.../flink/streaming/api/scala/DataStreamTest.scala | 29 ----
.../scala/StreamingScalaAPICompletenessTest.scala | 13 +-
.../streaming/runtime/DirectedOutputITCase.java | 93 ------------
.../test/streaming/runtime/IterateITCase.java | 46 ++++--
.../streaming/runtime/OutputSplitterITCase.java | 147 -------------------
.../runtime/util/EvenOddOutputSelector.java | 35 -----
40 files changed, 151 insertions(+), 1623 deletions(-)
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index ecf57ee..7f80d3b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -21,15 +21,15 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Random;
/**
@@ -50,6 +50,9 @@ public class IterateExample {
private static final int BOUND = 100;
+ private static final OutputTag<Tuple5<Integer, Integer, Integer, Integer, Integer>> ITERATE_TAG =
+ new OutputTag<Tuple5<Integer, Integer, Integer, Integer, Integer>>("iterate") {};
+
// *************************************************************************
// PROGRAM
// *************************************************************************
@@ -84,19 +87,16 @@ public class IterateExample {
.iterate(5000L);
// apply the step function to get the next Fibonacci number
- // increment the counter and split the output with the output selector
- SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
- .split(new MySelector());
+ // increment the counter and split the output
+ SingleOutputStreamOperator<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.process(new Step());
// close the iteration by selecting the tuples that were directed to the
// 'iterate' channel in the output selector
- it.closeWith(step.select("iterate"));
+ it.closeWith(step.getSideOutput(ITERATE_TAG));
- // to produce the final output select the tuples directed to the
- // 'output' channel then get the input pairs that have the greatest iteration counter
+ // to produce the final get the input pairs that have the greatest iteration counter
// on a 1 second sliding window
- DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
- .map(new OutputMap());
+ DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.map(new OutputMap());
// emit results
if (params.has("output")) {
@@ -176,33 +176,27 @@ public class IterateExample {
/**
* Iteration step function that calculates the next Fibonacci number.
*/
- public static class Step implements
- MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
- Integer, Integer>> {
+ public static class Step
+ extends ProcessFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
private static final long serialVersionUID = 1L;
@Override
- public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
- Integer> value) throws Exception {
- return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
- }
- }
-
- /**
- * OutputSelector testing which tuple needs to be iterated again.
- */
- public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
- private static final long serialVersionUID = 1L;
+ public void processElement(
+ Tuple5<Integer, Integer, Integer, Integer, Integer> value,
+ Context ctx,
+ Collector<Tuple5<Integer, Integer, Integer, Integer, Integer>> out) throws Exception {
+ Tuple5<Integer, Integer, Integer, Integer, Integer> element = new Tuple5<>(
+ value.f0,
+ value.f1,
+ value.f3,
+ value.f2 + value.f3,
+ ++value.f4);
- @Override
- public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
- List<String> output = new ArrayList<>();
if (value.f2 < BOUND && value.f3 < BOUND) {
- output.add("iterate");
+ ctx.output(ITERATE_TAG, element);
} else {
- output.add("output");
+ out.collect(element);
}
- return output;
}
}
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 5966de4..05ea145 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -113,6 +113,10 @@ under the License.
<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction)</exclude>
<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#apply(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.functions.windowing.WindowFunction)</exclude>
<exclude>org.apache.flink.streaming.api.datastream.WindowedStream#apply(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.streaming.api.functions.windowing.WindowFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+
+ <!-- DataStream#split was removed in 1.12 -->
+ <exclude>org.apache.flink.streaming.api.datastream.DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)</exclude>
+ <exclude>org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)</exclude>
</excludes>
</parameter>
</configuration>
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
deleted file mode 100644
index 3533cd1..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * Special version of {@link DirectedOutput} that performs a shallow copy of the
- * {@link StreamRecord} to ensure that multi-chaining works correctly.
- */
-public class CopyingDirectedOutput<OUT> extends DirectedOutput<OUT> {
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- public CopyingDirectedOutput(
- List<OutputSelector<OUT>> outputSelectors,
- List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
- super(outputSelectors, outputs);
- }
-
- @Override
- public void collect(StreamRecord<OUT> record) {
- Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
-
- if (selectedOutputs.isEmpty()) {
- return;
- }
-
- Iterator<Output<StreamRecord<OUT>>> it = selectedOutputs.iterator();
-
- while (true) {
- Output<StreamRecord<OUT>> out = it.next();
- if (it.hasNext()) {
- // we don't have the last output
- // perform a shallow copy
- StreamRecord<OUT> shallowCopy = record.copy(record.getValue());
- out.collect(shallowCopy);
- } else {
- // this is the last output
- out.collect(record);
- break;
- }
- }
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
deleted file mode 100644
index 2adeaf4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.XORShiftRandom;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * Wrapping {@link Output} that forwards to other {@link Output Outputs } based on a list of
- * {@link OutputSelector OutputSelectors}.
- */
-public class DirectedOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
-
- protected final OutputSelector<OUT>[] outputSelectors;
-
- protected final Output<StreamRecord<OUT>>[] selectAllOutputs;
-
- protected final HashMap<String, Output<StreamRecord<OUT>>[]> outputMap;
-
- protected final Output<StreamRecord<OUT>>[] allOutputs;
-
- private final Random random = new XORShiftRandom();
-
- protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- public DirectedOutput(
- List<OutputSelector<OUT>> outputSelectors,
- List<? extends Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
- this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]);
-
- this.allOutputs = new Output[outputs.size()];
- for (int i = 0; i < outputs.size(); i++) {
- allOutputs[i] = outputs.get(i).f0;
- }
-
- HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new HashSet<Output<StreamRecord<OUT>>>();
- HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap = new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>();
-
- for (Tuple2<? extends Output<StreamRecord<OUT>>, StreamEdge> outputPair : outputs) {
- final Output<StreamRecord<OUT>> output = outputPair.f0;
- final StreamEdge edge = outputPair.f1;
-
- List<String> selectedNames = edge.getSelectedNames();
-
- if (selectedNames.isEmpty()) {
- selectAllOutputs.add(output);
- }
- else {
- for (String selectedName : selectedNames) {
- if (!outputMap.containsKey(selectedName)) {
- outputMap.put(selectedName, new ArrayList<Output<StreamRecord<OUT>>>());
- outputMap.get(selectedName).add(output);
- }
- else {
- if (!outputMap.get(selectedName).contains(output)) {
- outputMap.get(selectedName).add(output);
- }
- }
- }
- }
- }
-
- this.selectAllOutputs = selectAllOutputs.toArray(new Output[selectAllOutputs.size()]);
-
- this.outputMap = new HashMap<>();
- for (Map.Entry<String, ArrayList<Output<StreamRecord<OUT>>>> entry : outputMap.entrySet()) {
- Output<StreamRecord<OUT>>[] arr = entry.getValue().toArray(new Output[entry.getValue().size()]);
- this.outputMap.put(entry.getKey(), arr);
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- watermarkGauge.setCurrentWatermark(mark.getTimestamp());
- for (Output<StreamRecord<OUT>> out : allOutputs) {
- out.emitWatermark(mark);
- }
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- // randomly select an output
- allOutputs[random.nextInt(allOutputs.length)].emitLatencyMarker(latencyMarker);
- }
-
- protected Set<Output<StreamRecord<OUT>>> selectOutputs(StreamRecord<OUT> record) {
- Set<Output<StreamRecord<OUT>>> selectedOutputs = new HashSet<>(selectAllOutputs.length);
- Collections.addAll(selectedOutputs, selectAllOutputs);
-
- for (OutputSelector<OUT> outputSelector : outputSelectors) {
- Iterable<String> outputNames = outputSelector.select(record.getValue());
-
- for (String outputName : outputNames) {
- Output<StreamRecord<OUT>>[] outputList = outputMap.get(outputName);
- if (outputList != null) {
- Collections.addAll(selectedOutputs, outputList);
- }
- }
- }
-
- return selectedOutputs;
- }
-
- @Override
- public void collect(StreamRecord<OUT> record) {
- Set<Output<StreamRecord<OUT>>> selectedOutputs = selectOutputs(record);
-
- for (Output<StreamRecord<OUT>> out : selectedOutputs) {
- out.collect(record);
- }
- }
-
- @Override
- public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
- throw new UnsupportedOperationException("Cannot use split/select with side outputs.");
- }
-
- @Override
- public void close() {
- for (Output<StreamRecord<OUT>> out : allOutputs) {
- out.close();
- }
- }
-
- @Override
- public Gauge<Long> getWatermarkGauge() {
- return watermarkGauge;
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
deleted file mode 100644
index a8433fe..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-import java.io.Serializable;
-
-/**
- * Interface for defining an OutputSelector for a {@link SplitStream} using
- * the {@link SingleOutputStreamOperator#split} call. Every output object of a
- * {@link SplitStream} will run through this operator to select outputs.
- *
- * @param <OUT>
- * Type parameter of the split values.
- */
-@PublicEvolving
-public interface OutputSelector<OUT> extends Serializable {
- /**
- * Method for selecting output names for the emitted objects when using the
- * {@link SingleOutputStreamOperator#split} method. The values will be
- * emitted only to output names which are contained in the returned
- * iterable.
- *
- * @param value
- * Output object for which the output selection should be made.
- */
- Iterable<String> select(OUT value);
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5a54ea2..8198450 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -51,7 +51,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -97,6 +96,7 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -229,23 +229,6 @@ public class DataStream<T> {
}
/**
- * Operator used for directing tuples to specific named outputs using an
- * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
- * Calling this method on an operator creates a new {@link SplitStream}.
- *
- * @param outputSelector
- * The user defined
- * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
- * for directing the tuples.
- * @return The {@link SplitStream}
- * @deprecated Please use side output instead.
- */
- @Deprecated
- public SplitStream<T> split(OutputSelector<T> outputSelector) {
- return new SplitStream<>(this, clean(outputSelector));
- }
-
- /**
* Creates a new {@link ConnectedStreams} by connecting
* {@link DataStream} outputs of (possible) different types with each other.
* The DataStreams connected using this operator can be used with
@@ -526,7 +509,7 @@ public class DataStream<T> {
*
* <p>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
- * {@link #split(OutputSelector)} for more information.
+ * {@link ProcessFunction.Context#output(OutputTag, Object)} for more information.
*
* <p>The iteration edge will be partitioned the same way as the first input of
* the iteration head unless it is changed in the
@@ -558,7 +541,7 @@ public class DataStream<T> {
*
* <p>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
- * {@link #split(OutputSelector)} for more information.
+ * {@link ProcessFunction.Context#output(OutputTag, Object)} for more information.
*
* <p>The iteration edge will be partitioned the same way as the first input of
* the iteration head unless it is changed in the
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index 430241a..228ec3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -23,8 +23,10 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.util.OutputTag;
import java.util.Collection;
@@ -54,7 +56,7 @@ public class IterativeStream<T> extends SingleOutputStreamOperator<T> {
*
* <p>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
- * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
+ * {@link ProcessFunction.Context#output(OutputTag, Object)}
* for more information.
*
* @param feedbackStream
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 6a70803..9f0dc04 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
@@ -57,8 +56,6 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
*/
private Map<OutputTag<?>, TypeInformation<?>> requestedSideOutputs = new HashMap<>();
- private boolean wasSplitApplied = false;
-
protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, Transformation<T> transformation) {
super(environment, transformation);
}
@@ -379,17 +376,6 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
return this;
}
- @Override
- public SplitStream<T> split(OutputSelector<T> outputSelector) {
- if (requestedSideOutputs.isEmpty()) {
- wasSplitApplied = true;
- return super.split(outputSelector);
- } else {
- throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
- "As a work-around, please add a no-op map function before the split() call.");
- }
- }
-
/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
@@ -397,11 +383,6 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
- if (wasSplitApplied) {
- throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
- "As a work-around, please add a no-op map function before the split() call.");
- }
-
sideOutputTag = clean(requireNonNull(sideOutputTag));
// make a defensive copy
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
deleted file mode 100644
index 7f28dc7..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-/**
- * The SplitStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply transformation on the whole output simply
- * call the transformation on the SplitStream
- *
- * @param <OUT> The type of the elements in the Stream
- */
-
-@Deprecated
-@PublicEvolving
-public class SplitStream<OUT> extends DataStream<OUT> {
-
- protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
- super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
- }
-
- /**
- * Sets the output names for which the next operator will receive values.
- *
- * @param outputNames
- * The output names for which the operator will receive the
- * input.
- * @return Returns the selected DataStream
- */
- public DataStream<OUT> select(String... outputNames) {
- return selectOutput(outputNames);
- }
-
- private DataStream<OUT> selectOutput(String[] outputNames) {
- for (String outName : outputNames) {
- if (outName == null) {
- throw new RuntimeException("Selected names must not be null");
- }
- }
-
- SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
- return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
- }
-
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0a23523..3b76227 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.util.ClassLoaderUtil;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -41,7 +40,6 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,7 +72,6 @@ public class StreamConfig implements Serializable {
private static final String CHAIN_INDEX = "chainIndex";
private static final String VERTEX_NAME = "vertexID";
private static final String ITERATION_ID = "iterationId";
- private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
private static final String INPUTS = "inputs";
private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
@@ -278,25 +275,6 @@ public class StreamConfig implements Serializable {
}
}
- public void setOutputSelectors(List<OutputSelector<?>> outputSelectors) {
- try {
- InstantiationUtil.writeObjectToConfig(outputSelectors, this.config, OUTPUT_SELECTOR_WRAPPER);
- } catch (IOException e) {
- throw new StreamTaskException("Could not serialize output selectors", e);
- }
- }
-
- public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader userCodeClassloader) {
- try {
- List<OutputSelector<T>> selectors =
- InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, userCodeClassloader);
- return selectors == null ? Collections.<OutputSelector<T>>emptyList() : selectors;
-
- } catch (Exception e) {
- throw new StreamTaskException("Could not read output selectors", e);
- }
- }
-
public void setIterationId(String iterationId) {
config.setString(ITERATION_ID, iterationId);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 70232ce..c4f32c5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.OutputTag;
import java.io.Serializable;
-import java.util.List;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -50,13 +49,6 @@ public class StreamEdge implements Serializable {
* The type number of the input for co-tasks.
*/
private final int typeNumber;
-
- /**
- * A list of output names that the target vertex listens to (if there is
- * output selection).
- */
- private final List<String> selectedNames;
-
/**
* The side-output tag (if any) of this {@link StreamEdge}.
*/
@@ -85,7 +77,6 @@ public class StreamEdge implements Serializable {
StreamNode sourceVertex,
StreamNode targetVertex,
int typeNumber,
- List<String> selectedNames,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag) {
@@ -94,7 +85,6 @@ public class StreamEdge implements Serializable {
targetVertex,
typeNumber,
ALWAYS_FLUSH_BUFFER_TIMEOUT,
- selectedNames,
outputPartitioner,
outputTag,
ShuffleMode.UNDEFINED);
@@ -104,7 +94,6 @@ public class StreamEdge implements Serializable {
StreamNode sourceVertex,
StreamNode targetVertex,
int typeNumber,
- List<String> selectedNames,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag,
ShuffleMode shuffleMode) {
@@ -114,7 +103,6 @@ public class StreamEdge implements Serializable {
targetVertex,
typeNumber,
sourceVertex.getBufferTimeout(),
- selectedNames,
outputPartitioner,
outputTag,
shuffleMode);
@@ -125,7 +113,6 @@ public class StreamEdge implements Serializable {
StreamNode targetVertex,
int typeNumber,
long bufferTimeout,
- List<String> selectedNames,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag,
ShuffleMode shuffleMode) {
@@ -134,13 +121,12 @@ public class StreamEdge implements Serializable {
this.targetId = targetVertex.getId();
this.typeNumber = typeNumber;
this.bufferTimeout = bufferTimeout;
- this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
this.outputTag = outputTag;
this.sourceOperatorName = sourceVertex.getOperatorName();
this.targetOperatorName = targetVertex.getOperatorName();
this.shuffleMode = checkNotNull(shuffleMode);
- this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames + "_" + outputPartitioner;
+ this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + outputPartitioner;
}
public int getSourceId() {
@@ -155,10 +141,6 @@ public class StreamEdge implements Serializable {
return typeNumber;
}
- public List<String> getSelectedNames() {
- return selectedNames;
- }
-
public OutputTag getOutputTag() {
return this.outputTag;
}
@@ -206,7 +188,7 @@ public class StreamEdge implements Serializable {
@Override
public String toString() {
return "(" + (sourceOperatorName + "-" + sourceId) + " -> " + (targetOperatorName + "-" + targetId)
- + ", typeNumber=" + typeNumber + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+ + ", typeNumber=" + typeNumber + ", outputPartitioner=" + outputPartitioner
+ ", bufferTimeout=" + bufferTimeout + ", outputTag=" + outputTag + ')';
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7df5c28..c978435 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -106,7 +105,6 @@ public class StreamGraph implements Pipeline {
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;
- private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, ShuffleMode>> virtualPartitionNodes;
@@ -129,7 +127,6 @@ public class StreamGraph implements Pipeline {
*/
public void clear() {
streamNodes = new HashMap<>();
- virtualSelectNodes = new HashMap<>();
virtualSideOutputNodes = new HashMap<>();
virtualPartitionNodes = new HashMap<>();
vertexIDtoBrokerID = new HashMap<>();
@@ -392,7 +389,6 @@ public class StreamGraph implements Pipeline {
coLocationGroup,
operatorFactory,
operatorName,
- new ArrayList<OutputSelector<?>>(),
vertexClass);
streamNodes.put(vertexID, vertex);
@@ -401,27 +397,6 @@ public class StreamGraph implements Pipeline {
}
/**
- * Adds a new virtual node that is used to connect a downstream vertex to only the outputs
- * with the selected names.
- *
- * <p>When adding an edge from the virtual node to a downstream node the connection will be made
- * to the original node, only with the selected names given here.
- *
- * @param originalId ID of the node that should be connected to.
- * @param virtualId ID of the virtual node.
- * @param selectedNames The selected names.
- */
- public void addVirtualSelectNode(Integer originalId, Integer virtualId, List<String> selectedNames) {
-
- if (virtualSelectNodes.containsKey(virtualId)) {
- throw new IllegalStateException("Already has virtual select node with id " + virtualId);
- }
-
- virtualSelectNodes.put(virtualId,
- new Tuple2<Integer, List<String>>(originalId, selectedNames));
- }
-
- /**
* Adds a new virtual node that is used to connect a downstream vertex to only the outputs with
* the selected side-output {@link OutputTag}.
*
@@ -488,9 +463,6 @@ public class StreamGraph implements Pipeline {
if (virtualSideOutputNodes.containsKey(id)) {
Integer mappedId = virtualSideOutputNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
- } else if (virtualSelectNodes.containsKey(id)) {
- Integer mappedId = virtualSelectNodes.get(id).f0;
- return getSlotSharingGroup(mappedId);
} else if (virtualPartitionNodes.containsKey(id)) {
Integer mappedId = virtualPartitionNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
@@ -526,14 +498,6 @@ public class StreamGraph implements Pipeline {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
- } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
- if (outputNames.isEmpty()) {
- // selections that happen downstream override earlier selections
- outputNames = virtualSelectNodes.get(virtualId).f1;
- }
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
@@ -567,28 +531,14 @@ public class StreamGraph implements Pipeline {
shuffleMode = ShuffleMode.UNDEFINED;
}
- StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
+ StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber,
+ partitioner, outputTag, shuffleMode);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
- public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
- if (virtualPartitionNodes.containsKey(vertexID)) {
- addOutputSelector(virtualPartitionNodes.get(vertexID).f0, outputSelector);
- } else if (virtualSelectNodes.containsKey(vertexID)) {
- addOutputSelector(virtualSelectNodes.get(vertexID).f0, outputSelector);
- } else {
- getStreamNode(vertexID).addOutputSelector(outputSelector);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Outputselector set for {}", vertexID);
- }
- }
-
- }
-
public void setParallelism(Integer vertexID, int parallelism) {
if (getStreamNode(vertexID) != null) {
getStreamNode(vertexID).setParallelism(parallelism);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 0f6c0e7..d689c99 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -41,11 +41,9 @@ import org.apache.flink.streaming.api.transformations.LegacySourceTransformation
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
@@ -260,10 +258,6 @@ public class StreamGraphGenerator {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
- } else if (transform instanceof SplitTransformation<?>) {
- transformedIds = transformSplit((SplitTransformation<?>) transform);
- } else if (transform instanceof SelectTransformation<?>) {
- transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
@@ -352,56 +346,6 @@ public class StreamGraphGenerator {
}
/**
- * Transforms a {@code SplitTransformation}.
- *
- * <p>We add the output selector to previously transformed nodes.
- */
- private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
-
- Transformation<T> input = split.getInput();
- Collection<Integer> resultIds = transform(input);
-
- validateSplitTransformation(input);
-
- // the recursive transform call might have transformed this already
- if (alreadyTransformed.containsKey(split)) {
- return alreadyTransformed.get(split);
- }
-
- for (int inputId : resultIds) {
- streamGraph.addOutputSelector(inputId, split.getOutputSelector());
- }
-
- return resultIds;
- }
-
- /**
- * Transforms a {@code SelectTransformation}.
- *
- * <p>For this we create a virtual node in the {@code StreamGraph} holds the selected names.
- *
- * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
- */
- private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
- Transformation<T> input = select.getInput();
- Collection<Integer> resultIds = transform(input);
-
- // the recursive transform might have already transformed this
- if (alreadyTransformed.containsKey(select)) {
- return alreadyTransformed.get(select);
- }
-
- List<Integer> virtualResultIds = new ArrayList<>();
-
- for (int inputId : resultIds) {
- int virtualId = Transformation.getNewNodeId();
- streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
- virtualResultIds.add(virtualId);
- }
- return virtualResultIds;
- }
-
- /**
* Transforms a {@code SideOutputTransformation}.
*
* <p>For this we create a virtual node in the {@code StreamGraph} that holds the side-output
@@ -839,20 +783,4 @@ public class StreamGraphGenerator {
return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup;
}
}
-
- private <T> void validateSplitTransformation(Transformation<T> input) {
- if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
- throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
- } else if (input instanceof SideOutputTransformation) {
- throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
- } else if (input instanceof UnionTransformation) {
- for (Transformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
- validateSplitTransformation(transformation);
- }
- } else if (input instanceof PartitionTransformation) {
- validateSplitTransformation(((PartitionTransformation) input).getInput());
- } else {
- return;
- }
- }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 8f25d6c..f36d95e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -69,7 +68,6 @@ public class StreamNode implements Serializable {
private TypeSerializer<?> stateKeySerializer;
private transient StreamOperatorFactory<?> operatorFactory;
- private List<OutputSelector<?>> outputSelectors;
private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];
private TypeSerializer<?> typeSerializerOut;
@@ -91,10 +89,9 @@ public class StreamNode implements Serializable {
@Nullable String coLocationGroup,
StreamOperator<?> operator,
String operatorName,
- List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
this(id, slotSharingGroup, coLocationGroup, SimpleOperatorFactory.of(operator),
- operatorName, outputSelector, jobVertexClass);
+ operatorName, jobVertexClass);
}
public StreamNode(
@@ -103,12 +100,10 @@ public class StreamNode implements Serializable {
@Nullable String coLocationGroup,
StreamOperatorFactory<?> operatorFactory,
String operatorName,
- List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
this.id = id;
this.operatorName = operatorName;
this.operatorFactory = operatorFactory;
- this.outputSelectors = outputSelector;
this.jobVertexClass = jobVertexClass;
this.slotSharingGroup = slotSharingGroup;
this.coLocationGroup = coLocationGroup;
@@ -230,14 +225,6 @@ public class StreamNode implements Serializable {
return operatorName;
}
- public List<OutputSelector<?>> getOutputSelectors() {
- return outputSelectors;
- }
-
- public void addOutputSelector(OutputSelector<?> outputSelector) {
- this.outputSelectors.add(outputSelector);
- }
-
public void setSerializersIn(TypeSerializer<?> ...typeSerializersIn) {
checkArgument(typeSerializersIn.length > 0);
this.typeSerializersIn = typeSerializersIn;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 31eafd6..758f479 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -496,7 +496,6 @@ public class StreamingJobGraphGenerator {
}
config.setStreamOperatorFactory(vertex.getOperatorFactory());
- config.setOutputSelectors(vertex.getOutputSelectors());
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
deleted file mode 100644
index 7343700..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.transformations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Transformation;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a selection of only certain upstream elements. This must
- * follow a {@link org.apache.flink.streaming.api.transformations.SplitTransformation} that
- * splits elements into several logical streams with assigned names.
- *
- * <p>This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code SelectTransformation}
- */
-@Internal
-public class SelectTransformation<T> extends Transformation<T> {
-
- private final Transformation<T> input;
- private final List<String> selectedNames;
-
- /**
- * Creates a new {@code SelectionTransformation} from the given input that only selects
- * the streams with the selected names.
- *
- * @param input The input {@code Transformation}
- * @param selectedNames The names from the upstream {@code SplitTransformation} that this
- * {@code SelectTransformation} selects.
- */
- public SelectTransformation(
- Transformation<T> input,
- List<String> selectedNames) {
- super("Select", input.getOutputType(), input.getParallelism());
- this.input = input;
- this.selectedNames = selectedNames;
- }
-
- /**
- * Returns the input {@code Transformation}.
- */
- public Transformation<T> getInput() {
- return input;
- }
-
- /**
- * Returns the names of the split streams that this {@code SelectTransformation} selects.
- */
- public List<String> getSelectedNames() {
- return selectedNames;
- }
-
- @Override
- public Collection<Transformation<?>> getTransitivePredecessors() {
- List<Transformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input.getTransitivePredecessors());
- return result;
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
deleted file mode 100644
index 2ff312d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.transformations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a split of one
- * {@link org.apache.flink.streaming.api.datastream.DataStream} into several {@code DataStreams}
- * using an {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
- *
- * <p>This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code SplitTransformation}
- */
-@Internal
-public class SplitTransformation<T> extends Transformation<T> {
-
- private final Transformation<T> input;
-
- private final OutputSelector<T> outputSelector;
-
- /**
- * Creates a new {@code SplitTransformation} from the given input and {@code OutputSelector}.
- *
- * @param input The input {@code Transformation}
- * @param outputSelector The output selector
- */
- public SplitTransformation(
- Transformation<T> input,
- OutputSelector<T> outputSelector) {
- super("Split", input.getOutputType(), input.getParallelism());
- this.input = input;
- this.outputSelector = outputSelector;
- }
-
- /**
- * Returns the input {@code Transformation}.
- */
- public Transformation<T> getInput() {
- return input;
- }
-
- /**
- * Returns the {@code OutputSelector}.
- */
- public OutputSelector<T> getOutputSelector() {
- return outputSelector;
- }
-
- @Override
- public Collection<Transformation<?>> getTransitivePredecessors() {
- List<Transformation<?>> result = Lists.newArrayList();
- result.add(this);
- result.addAll(input.getTransitivePredecessors());
- return result;
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 6efb5dd..ebfa615 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -31,9 +31,6 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
-import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.SourceInputConfig;
@@ -516,47 +513,25 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
allOutputs.add(new Tuple2<>(output, outputEdge));
}
- // if there are multiple outputs, or the outputs are directed, we need to
- // wrap them as one output
-
- List<OutputSelector<T>> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
-
- if (selectors == null || selectors.isEmpty()) {
- // simple path, no selector necessary
- if (allOutputs.size() == 1) {
- return allOutputs.get(0).f0;
- }
- else {
- // send to N outputs. Note that this includes the special case
- // of sending to zero outputs
- @SuppressWarnings({"unchecked", "rawtypes"})
- Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
- for (int i = 0; i < allOutputs.size(); i++) {
- asArray[i] = allOutputs.get(i).f0;
- }
-
- // This is the inverse of creating the normal ChainingOutput.
- // If the chaining output does not copy we need to copy in the broadcast output,
- // otherwise multi-chaining would not work correctly.
- if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
- return new CopyingBroadcastingOutputCollector<>(asArray, this);
- } else {
- return new BroadcastingOutputCollector<>(asArray, this);
- }
+ if (allOutputs.size() == 1) {
+ return allOutputs.get(0).f0;
+ } else {
+ // send to N outputs. Note that this includes the special case
+ // of sending to zero outputs
+ @SuppressWarnings({"unchecked"})
+ Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
+ for (int i = 0; i < allOutputs.size(); i++) {
+ asArray[i] = allOutputs.get(i).f0;
}
- }
- else {
- // selector present, more complex routing necessary
// This is the inverse of creating the normal ChainingOutput.
// If the chaining output does not copy we need to copy in the broadcast output,
// otherwise multi-chaining would not work correctly.
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
- return new CopyingDirectedOutput<>(selectors, allOutputs);
+ return new CopyingBroadcastingOutputCollector<>(asArray, this);
} else {
- return new DirectedOutput<>(selectors, allOutputs);
+ return new BroadcastingOutputCollector<>(asArray, this);
}
-
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 4ae3218..d11c07b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -50,7 +49,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@@ -83,7 +81,6 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.StringStartsWith;
@@ -996,24 +993,6 @@ public class DataStreamTest extends TestLogger {
fail(e.getMessage());
}
- OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
- SplitStream<Integer> split = unionFilter.split(outputSelector);
- split.select("dummy").addSink(new DiscardingSink<Integer>());
- List<OutputSelector<?>> outputSelectors = getStreamGraph(env).getStreamNode(unionFilter.getId()).getOutputSelectors();
- assertEquals(1, outputSelectors.size());
- assertEquals(outputSelector, outputSelectors.get(0));
-
- DataStream<Integer> select = split.select("a");
- DataStreamSink<Integer> sink = select.print();
-
- StreamEdge splitEdge = getStreamGraph(env).getStreamEdges(unionFilter.getId(), sink.getTransformation().getId()).get(0);
- assertEquals("a", splitEdge.getSelectedNames().get(0));
-
- DataStreamSink<Integer> sinkWithIdentifier = select.print("identifier");
- StreamEdge newSplitEdge = getStreamGraph(env).getStreamEdges(unionFilter.getId(), sinkWithIdentifier.getTransformation().getId()).get(0);
- assertEquals("a", newSplitEdge.getSelectedNames().get(0));
-
ConnectedStreams<Integer, Integer> connect = map.connect(flatMap);
CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() {
private static final long serialVersionUID = 1L;
@@ -1147,91 +1126,6 @@ public class DataStreamTest extends TestLogger {
}
/////////////////////////////////////////////////////////////
- // Split testing
- /////////////////////////////////////////////////////////////
-
- @Test
- public void testConsecutiveSplitRejection() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Integer> src = env.fromElements(0, 0);
-
- OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
- src.split(outputSelector).split(outputSelector).addSink(new DiscardingSink<>());
-
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-
- getStreamGraph(env);
- }
-
- @Test
- public void testSplitAfterSideOutputRejection() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Integer> src = env.fromElements(0, 0);
-
- OutputTag<Integer> outputTag = new OutputTag<Integer>("dummy"){};
- OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
- src.getSideOutput(outputTag).split(outputSelector).addSink(new DiscardingSink<>());
-
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
-
- getStreamGraph(env);
- }
-
- @Test
- public void testSelectBetweenConsecutiveSplitRejection() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Integer> src = env.fromElements(0, 0);
-
- OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
- src.split(outputSelector).select("dummy").split(outputSelector).addSink(new DiscardingSink<>());
-
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-
- getStreamGraph(env);
- }
-
- @Test
- public void testUnionBetweenConsecutiveSplitRejection() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Integer> src = env.fromElements(0, 0);
-
- OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
- src.split(outputSelector).select("dummy").union(src.map(x -> x)).split(outputSelector).addSink(new DiscardingSink<>());
-
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-
- getStreamGraph(env);
- }
-
- @Test
- public void testKeybyBetweenConsecutiveSplitRejection() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Integer> src = env.fromElements(0, 0);
-
- OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
-
- src.split(outputSelector).select("dummy").keyBy(x -> x).split(outputSelector).addSink(new DiscardingSink<>());
-
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
-
- getStreamGraph(env);
- }
-
- /////////////////////////////////////////////////////////////
// KeyBy testing
/////////////////////////////////////////////////////////////
@@ -1592,11 +1486,4 @@ public class DataStreamTest extends TestLogger {
private enum TestEnum {
FOO, BAR
}
-
- private class DummyOutputSelector<Integer> implements OutputSelector<Integer> {
- @Override
- public Iterable<String> select(Integer value) {
- return null;
- }
- }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
deleted file mode 100644
index a2f8ed6..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link OutputSelector}.
- */
-public class OutputSelectorTest {
-
- static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterable<String> select(Tuple1<Integer> tuple) {
-
- String[] outputs = new String[tuple.f0];
-
- for (Integer i = 0; i < tuple.f0; i++) {
- outputs[i] = i.toString();
- }
- return Arrays.asList(outputs);
- }
- }
-
- @Test
- public void testGetOutputs() {
- OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
- List<String> expectedOutputs = new ArrayList<String>();
- expectedOutputs.add("0");
- expectedOutputs.add("1");
- assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2)));
- expectedOutputs.add("2");
- assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3)));
- }
-
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
deleted file mode 100644
index 8f33e19..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-
-/**
- * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}.
- */
-public class SplitSideOutputTest {
-
- private static final OutputTag<String> outputTag = new OutputTag<String>("outputTag") {};
-
- @Test
- public void testSideOutputAfterSelectIsForbidden() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SingleOutputStreamOperator<String> processInput = env.fromElements("foo")
- .process(new DummyProcessFunction());
-
- processInput.split(Collections::singleton);
-
- try {
- processInput.getSideOutput(outputTag);
- Assert.fail("Should have failed early with an exception.");
- } catch (UnsupportedOperationException expected){
- // expected
- }
- }
-
- @Test
- public void testSelectAfterSideOutputIsForbidden() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SingleOutputStreamOperator<String> processInput = env.fromElements("foo")
- .process(new DummyProcessFunction());
-
- processInput.getSideOutput(outputTag);
-
- try {
- processInput.split(Collections::singleton);
- Assert.fail("Should have failed early with an exception.");
- } catch (UnsupportedOperationException expected){
- // expected
- }
- }
-
- private static final class DummyProcessFunction extends ProcessFunction<String, String> {
-
- @Override
- public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
- }
- }
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 3d0f502..9124c49 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -54,7 +54,6 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.util.TestLogger;
@@ -150,50 +149,38 @@ public class StreamGraphGeneratorTest extends TestLogger {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source = env.fromElements(1, 10);
-
DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
// verify that only the partitioning that was set last is used
DataStream<Integer> broadcastMap = rebalanceMap
- .forward()
- .global()
- .broadcast()
- .map(new NoOpIntMap());
+ .forward()
+ .global()
+ .broadcast()
+ .map(new NoOpIntMap());
broadcastMap.addSink(new DiscardingSink<>());
- // verify that partitioning is preserved across union and split/select
- EvenOddOutputSelector selector1 = new EvenOddOutputSelector();
- EvenOddOutputSelector selector2 = new EvenOddOutputSelector();
- EvenOddOutputSelector selector3 = new EvenOddOutputSelector();
-
- DataStream<Integer> map1Operator = rebalanceMap
- .map(new NoOpIntMap());
+ DataStream<Integer> broadcastOperator = rebalanceMap
+ .map(new NoOpIntMap())
+ .name("broadcast");
- DataStream<Integer> map1 = map1Operator
- .broadcast()
- .split(selector1)
- .select("even");
+ DataStream<Integer> map1 = broadcastOperator.broadcast();
- DataStream<Integer> map2Operator = rebalanceMap
- .map(new NoOpIntMap());
+ DataStream<Integer> globalOperator = rebalanceMap
+ .map(new NoOpIntMap())
+ .name("global");
- DataStream<Integer> map2 = map2Operator
- .split(selector2)
- .select("odd")
- .global();
+ DataStream<Integer> map2 = globalOperator.global();
- DataStream<Integer> map3Operator = rebalanceMap
- .map(new NoOpIntMap());
+ DataStream<Integer> shuffleOperator = rebalanceMap
+ .map(new NoOpIntMap())
+ .name("shuffle");
- DataStream<Integer> map3 = map3Operator
- .global()
- .split(selector3)
- .select("even")
- .shuffle();
+ DataStream<Integer> map3 = shuffleOperator.shuffle();
SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3)
- .map(new NoOpIntMap());
+ .map(new NoOpIntMap())
+ .name("union");
unionedMap.addSink(new DiscardingSink<>());
@@ -206,68 +193,10 @@ public class StreamGraphGeneratorTest extends TestLogger {
assertTrue(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
assertEquals(rebalanceMap.getId(), graph.getSourceVertex(graph.getStreamNode(broadcastMap.getId()).getInEdges().get(0)).getId());
- // verify that partitioning in unions is preserved and that it works across split/select
- assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(graph.getStreamNode(map1Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
- assertTrue(graph.getStreamNode(map1Operator.getId()).getOutputSelectors().contains(selector1));
-
- assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
- assertTrue(graph.getStreamNode(map2Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("odd"));
- assertTrue(graph.getStreamNode(map2Operator.getId()).getOutputSelectors().contains(selector2));
-
- assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
- assertTrue(graph.getStreamNode(map3Operator.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("even"));
- assertTrue(graph.getStreamNode(map3Operator.getId()).getOutputSelectors().contains(selector3));
- }
-
- /**
- * This tests whether virtual Transformations behave correctly.
- *
- * <p>Checks whether output selector, partitioning works correctly when applied on a union.
- */
- @Test
- public void testVirtualTransformations2() throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Integer> source = env.fromElements(1, 10);
-
- DataStream<Integer> rebalanceMap = source.rebalance().map(new NoOpIntMap());
-
- DataStream<Integer> map1 = rebalanceMap
- .map(new NoOpIntMap());
-
- DataStream<Integer> map2 = rebalanceMap
- .map(new NoOpIntMap());
-
- DataStream<Integer> map3 = rebalanceMap
- .map(new NoOpIntMap());
-
- EvenOddOutputSelector selector = new EvenOddOutputSelector();
-
- SingleOutputStreamOperator<Integer> unionedMap = map1.union(map2).union(map3)
- .broadcast()
- .split(selector)
- .select("foo")
- .map(new NoOpIntMap());
-
- unionedMap.addSink(new DiscardingSink<>());
-
- StreamGraph graph = env.getStreamGraph();
-
- // verify that the properties are correctly set on all input operators
- assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(graph.getStreamNode(map1.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
- assertTrue(graph.getStreamNode(map1.getId()).getOutputSelectors().contains(selector));
-
- assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(graph.getStreamNode(map2.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
- assertTrue(graph.getStreamNode(map2.getId()).getOutputSelectors().contains(selector));
-
- assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(graph.getStreamNode(map3.getId()).getOutEdges().get(0).getSelectedNames().get(0).equals("foo"));
- assertTrue(graph.getStreamNode(map3.getId()).getOutputSelectors().contains(selector));
-
+ // verify that partitioning in unions is preserved
+ assertTrue(graph.getStreamNode(broadcastOperator.getId()).getOutEdges().get(0).getPartitioner() instanceof BroadcastPartitioner);
+ assertTrue(graph.getStreamNode(globalOperator.getId()).getOutEdges().get(0).getPartitioner() instanceof GlobalPartitioner);
+ assertTrue(graph.getStreamNode(shuffleOperator.getId()).getOutEdges().get(0).getPartitioner() instanceof ShufflePartitioner);
}
/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index b908144..d9d5174 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
@@ -39,12 +39,13 @@ import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -190,20 +191,25 @@ public class StreamOperatorChainingTest {
input = input
.map(value -> value);
- SplitStream<Integer> split = input.split(new OutputSelector<Integer>() {
+ OutputTag<Integer> oneOutput = new OutputTag<Integer>("one") {};
+ OutputTag<Integer> otherOutput = new OutputTag<Integer>("other") {};
+ SingleOutputStreamOperator<Object> split = input.process(new ProcessFunction<Integer, Object>() {
private static final long serialVersionUID = 1L;
@Override
- public Iterable<String> select(Integer value) {
+ public void processElement(
+ Integer value,
+ Context ctx, Collector<Object> out) throws Exception {
if (value.equals(1)) {
- return Collections.singletonList("one");
+ ctx.output(oneOutput, value);
} else {
- return Collections.singletonList("other");
+ ctx.output(otherOutput, value);
}
+
}
});
- split.select("one")
+ split.getSideOutput(oneOutput)
.map(value -> "First 1: " + value)
.addSink(new SinkFunction<String>() {
@@ -213,7 +219,7 @@ public class StreamOperatorChainingTest {
}
});
- split.select("one")
+ split.getSideOutput(oneOutput)
.map(value -> "First 2: " + value)
.addSink(new SinkFunction<String>() {
@@ -223,7 +229,7 @@ public class StreamOperatorChainingTest {
}
});
- split.select("other")
+ split.getSideOutput(otherOutput)
.map(value -> "Second: " + value)
.addSink(new SinkFunction<String>() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 9ed30d4..5182651 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -856,7 +856,6 @@ public class OneInputStreamTaskTest extends TestLogger {
null,
(StreamOperator<?>) null,
null,
- null,
null
),
new StreamNode(
@@ -865,11 +864,9 @@ public class OneInputStreamTaskTest extends TestLogger {
null,
(StreamOperator<?>) null,
null,
- null,
null
),
0,
- Collections.emptyList(),
null,
null
);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 6164163..5197dc4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -126,10 +126,9 @@ public class StreamConfigChainer<OWNER> {
tailConfig.setChainedOutputs(Collections.singletonList(
new StreamEdge(
- new StreamNode(tailConfig.getChainIndex(), null, null, (StreamOperator<?>) null, null, null, null),
- new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null),
+ new StreamNode(tailConfig.getChainIndex(), null, null, (StreamOperator<?>) null, null, null),
+ new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null),
0,
- Collections.<String>emptyList(),
null,
null)));
tailConfig = new StreamConfig(new Configuration());
@@ -153,15 +152,13 @@ public class StreamConfigChainer<OWNER> {
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
outEdgesInOrder.add(
new StreamEdge(
- new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null),
- new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null, null),
+ new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null),
+ new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null),
0,
- Collections.<String>emptyList(),
new BroadcastPartitioner<Object>(),
null));
tailConfig.setChainEnd();
- tailConfig.setOutputSelectors(Collections.emptyList());
tailConfig.setNumberOfOutputs(1);
tailConfig.setOutEdgesInOrder(outEdgesInOrder);
tailConfig.setNonChainedOutputs(outEdgesInOrder);
@@ -186,7 +183,6 @@ public class StreamConfigChainer<OWNER> {
null,
dummyOperator,
"source dummy",
- new LinkedList<>(),
SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(
MAIN_NODE_ID + 1,
@@ -194,14 +190,12 @@ public class StreamConfigChainer<OWNER> {
null,
dummyOperator,
"target dummy",
- new LinkedList<>(),
SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(
sourceVertexDummy,
targetVertexDummy,
0,
- new LinkedList<>(),
new BroadcastPartitioner<>(),
null));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 0ed1a41..af4308a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -165,7 +165,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
inputGates = new StreamTestSingleInputGate[inputChannelsPerGate.size()];
List<StreamEdge> inPhysicalEdges = new LinkedList<>();
- StreamNode mainNode = new StreamNode(StreamConfigChainer.MAIN_NODE_ID, null, null, (StreamOperator<?>) null, null, null, null);
+ StreamNode mainNode = new StreamNode(StreamConfigChainer.MAIN_NODE_ID, null, null, (StreamOperator<?>) null, null, null);
for (int i = 0; i < inputs.size(); i++) {
if ((inputs.get(i) instanceof NetworkInputConfig)) {
NetworkInputConfig networkInput = (NetworkInputConfig) inputs.get(i);
@@ -199,12 +199,11 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
inputSerializer,
bufferSize);
- StreamNode sourceVertexDummy = new StreamNode(0, null, null, (StreamOperator<?>) null, null, null, SourceStreamTask.class);
+ StreamNode sourceVertexDummy = new StreamNode(0, null, null, (StreamOperator<?>) null, null, SourceStreamTask.class);
StreamEdge streamEdge = new StreamEdge(
sourceVertexDummy,
targetVertexDummy,
gateIndex + 1,
- new LinkedList<>(),
new BroadcastPartitioner<>(),
null);
@@ -222,10 +221,9 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
List<StreamEdge> outEdgesInOrder = new LinkedList<>();
StreamEdge sourceToMainEdge = new StreamEdge(
- new StreamNode(maxNodeId + inputId + 1337, null, null, (StreamOperator<?>) null, null, null, null),
+ new StreamNode(maxNodeId + inputId + 1337, null, null, (StreamOperator<?>) null, null, null),
mainNode,
0,
- new LinkedList<>(),
new ForwardPartitioner<>(),
null);
outEdgesInOrder.add(sourceToMainEdge);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index e0ed885..175a244 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -202,7 +202,6 @@ public class StreamTaskTestHarness<OUT> {
setupCalled = true;
streamConfig.setChainStart();
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
- streamConfig.setOutputSelectors(Collections.emptyList());
streamConfig.setNumberOfOutputs(1);
streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
@@ -213,10 +212,10 @@ public class StreamTaskTestHarness<OUT> {
};
List<StreamEdge> outEdgesInOrder = new LinkedList<>();
- StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
- StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
+ StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", SourceStreamTask.class);
+ StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", SourceStreamTask.class);
- outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<>(), new BroadcastPartitioner<>(), null /* output tag */));
+ outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new BroadcastPartitioner<>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 719f16f..df1bc59 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -110,8 +110,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
private static final long serialVersionUID = 1L;
};
- StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
- StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
+ StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", SourceStreamTask.class);
+ StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", SourceStreamTask.class);
for (int i = 0; i < numInputGates; i++) {
@@ -126,7 +126,6 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
targetVertexDummy,
1,
- new LinkedList<>(),
new BroadcastPartitioner<>(),
null /* output tag */);
@@ -143,7 +142,6 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
targetVertexDummy,
2,
- new LinkedList<>(),
new BroadcastPartitioner<>(),
null /* output tag */);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
deleted file mode 100644
index 26da5d3..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import java.util.Arrays;
-
-/**
- * Tests for {@link OutputSelector}.
- */
-public class EvenOddOutputSelector implements OutputSelector<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterable<String> select(Integer value) {
- return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
- }
-}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
index ec31592..bbfa4bb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
/**
@@ -41,7 +40,6 @@ public class MockStreamConfig extends StreamConfig {
super(configuration);
setChainStart();
- setOutputSelectors(Collections.emptyList());
setNumberOfOutputs(numberOfOutputs);
setTypeSerializerOut(new StringSerializer());
setVertexID(0);
@@ -52,13 +50,13 @@ public class MockStreamConfig extends StreamConfig {
private static final long serialVersionUID = 1L;
};
- StreamNode sourceVertex = new StreamNode(0, null, null, dummyOperator, "source", new ArrayList<>(), SourceStreamTask.class);
- StreamNode targetVertex = new StreamNode(1, null, null, dummyOperator, "target", new ArrayList<>(), SourceStreamTask.class);
+ StreamNode sourceVertex = new StreamNode(0, null, null, dummyOperator, "source", SourceStreamTask.class);
+ StreamNode targetVertex = new StreamNode(1, null, null, dummyOperator, "target", SourceStreamTask.class);
List<StreamEdge> outEdgesInOrder = new ArrayList<>(numberOfOutputs);
for (int i = 0; i < numberOfOutputs; i++) {
outEdgesInOrder.add(
- new StreamEdge(sourceVertex, targetVertex, numberOfOutputs, new ArrayList<>(), new BroadcastPartitioner<>(), null));
+ new StreamEdge(sourceVertex, targetVertex, numberOfOutputs, new BroadcastPartitioner<>(), null));
}
setOutEdgesInOrder(outEdgesInOrder);
setNonChainedOutputs(outEdgesInOrder);
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index c6295cc..9200cef 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -278,6 +278,11 @@ under the License.
<exclude>org.apache.flink.streaming.api.scala.WindowedStream#fold(java.lang.Object,scala.Function2,scala.Function4,org.apache.flink.api.common.typeinfo.TypeInformation,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
<exclude>org.apache.flink.streaming.api.scala.AllWindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
<exclude>org.apache.flink.streaming.api.scala.WindowedStream#fold(java.lang.Object,org.apache.flink.api.common.functions.FoldFunction,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
+
+ <!-- DataStream#split was removed in 1.12 -->
+ <exclude>org.apache.flink.streaming.api.scala.DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)</exclude>
+ <exclude>org.apache.flink.streaming.api.scala.DataStream#split(scala.Function1)</exclude>
+ <exclude>org.apache.flink.streaming.api.scala.SplitStream</exclude>
</excludes>
</parameter>
</configuration>
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0608c53..7bd08d4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -32,7 +32,6 @@ import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
import org.apache.flink.core.fs.{FileSystem, Path}
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
@@ -559,7 +558,7 @@ class DataStream[T](stream: JavaStream[T]) {
* stepfunction: initialStream => (feedback, output)
*
* A common pattern is to use output splitting to create feedback and output DataStream.
- * Please refer to the [[split]] method of the DataStream
+ * Please see the side outputs of [[ProcessFunction]] method of the DataStream
*
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the iteration head.
@@ -898,37 +897,6 @@ class DataStream[T](stream: JavaStream[T]) {
}
/**
- *
- * Operator used for directing tuples to specific named outputs using an
- * OutputSelector. Calling this method on an operator creates a new
- * [[SplitStream]].
- *
- * @deprecated Please use side output instead.
- */
- @deprecated
- def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))
-
- /**
- * Creates a new [[SplitStream]] that contains only the elements satisfying the
- * given output selector predicate.
- *
- * @deprecated Please use side output instead.
- */
- @deprecated
- def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
- if (fun == null) {
- throw new NullPointerException("OutputSelector must not be null.")
- }
- val cleanFun = clean(fun)
- val selector = new OutputSelector[T] {
- def select(in: T): java.lang.Iterable[String] = {
- cleanFun(in).toIterable.asJava
- }
- }
- split(selector)
- }
-
- /**
* Creates a co-group operation. See [[CoGroupedStreams]] for an example of how the keys
* and window can be specified.
*/
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
deleted file mode 100644
index 16a5a48..0000000
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.annotation.Public
-import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
-
-/**
- * The SplitStream represents an operator that has been split using an
- * [[org.apache.flink.streaming.api.collector.selector.OutputSelector]].
- * Named outputs can be selected using the [[SplitStream#select()]] function.
- * To apply a transformation on the whole output simply call
- * the appropriate method on this stream.
- */
-@deprecated("Please use side outputs instead of split/select", "deprecated since 1.8.2")
-@Public
-class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){
-
- /**
- * Sets the output names for which the next operator will receive values.
- */
- def select(outputNames: String*): DataStream[T] =
- asScalaStream(javaStream.select(outputNames: _*))
-}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index ef96fd1..ffc3d09 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -17,12 +17,10 @@
*/
package org.apache.flink.streaming.api
-
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
import org.apache.flink.streaming.api.datastream.{ BroadcastConnectedStream => BroadcastConnectedJavaStreams }
import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
@@ -51,12 +49,6 @@ package object scala {
= new KeyedStream[R, K](stream)
/**
- * Converts an [[org.apache.flink.streaming.api.datastream.SplitStream]] to a
- * [[org.apache.flink.streaming.api.scala.SplitStream]].
- */
- private[flink] def asScalaStream[R](stream: SplitJavaStream[R])
- = new SplitStream[R](stream)
- /**
* Converts an [[org.apache.flink.streaming.api.datastream.ConnectedStreams]] to a
* [[org.apache.flink.streaming.api.scala.ConnectedStreams]].
*/
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 3d7cbb2..d90dd25 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.ParallelIteratorInputFormat
import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
@@ -572,34 +571,6 @@ class DataStreamTest extends AbstractTestBase {
}
}
- val outputSelector = new OutputSelector[Int] {
- override def select(value: Int): lang.Iterable[String] = null
- }
-
- val split = unionFilter.split(outputSelector)
- split.print()
- val outputSelectors = getStreamGraph(env).getStreamNode(unionFilter.getId).getOutputSelectors
- assert(1 == outputSelectors.size)
- assert(outputSelector == outputSelectors.get(0))
-
- unionFilter.split(x => List("a")).print()
- val moreOutputSelectors = getStreamGraph(env)
- .getStreamNode(unionFilter.getId)
- .getOutputSelectors
- assert(2 == moreOutputSelectors.size)
-
- val select = split.select("a")
- val sink = select.print()
- val splitEdge =
- getStreamGraph(env).getStreamEdgesOrThrow(unionFilter.getId, sink.getTransformation.getId)
- assert("a" == splitEdge.get(0).getSelectedNames.get(0))
-
- val sinkWithIdentifier = select.print("identifier")
- val newSplitEdge = getStreamGraph(env).getStreamEdgesOrThrow(
- unionFilter.getId,
- sinkWithIdentifier.getTransformation.getId)
- assert("a" == newSplitEdge.get(0).getSelectedNames.get(0))
-
val connect = map.connect(flatMap)
val coMapFunction =
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index d8737e1..0903653 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -17,15 +17,15 @@
*/
package org.apache.flink.streaming.api.scala
-import java.lang.reflect.Method
-
import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import scala.language.existentials
-
import org.junit.Test
+import java.lang.reflect.Method
+
+import scala.language.existentials
+
/**
* This checks whether the streaming Scala API is up to feature parity with the Java API.
*/
@@ -116,11 +116,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
classOf[ConnectedStreams[_,_]])
checkMethods(
- "SplitStream", "SplitStream",
- classOf[org.apache.flink.streaming.api.datastream.SplitStream[_]],
- classOf[SplitStream[_]])
-
- checkMethods(
"WindowedStream", "WindowedStream",
classOf[org.apache.flink.streaming.api.datastream.WindowedStream[_, _, _]],
classOf[WindowedStream[_, _, _]])
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java
deleted file mode 100644
index c50e708..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.streaming.runtime;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Integration tests for a streaming {@link OutputSelector}.
- */
-public class DirectedOutputITCase extends AbstractTestBase {
-
- private static final String TEN = "ten";
- private static final String ODD = "odd";
- private static final String EVEN = "even";
- private static final String NON_SELECTED = "nonSelected";
-
- static final class MyOutputSelector implements OutputSelector<Long> {
- private static final long serialVersionUID = 1L;
-
- List<String> outputs = new ArrayList<String>();
-
- @Override
- public Iterable<String> select(Long value) {
- outputs.clear();
- if (value % 2 == 0) {
- outputs.add(EVEN);
- } else {
- outputs.add(ODD);
- }
-
- if (value == 10L) {
- outputs.add(TEN);
- }
-
- if (value == 11L) {
- outputs.add(NON_SELECTED);
- }
- return outputs;
- }
- }
-
- @Test
- public void outputSelectorTest() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
- TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();
- TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
- TestListResultSink<Long> allSink = new TestListResultSink<Long>();
-
- SplitStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
- source.select(EVEN).addSink(evenSink);
- source.select(ODD, TEN).addSink(oddAndTenSink);
- source.select(EVEN, ODD).addSink(evenAndOddSink);
- source.addSink(allSink);
-
- env.execute();
- assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult());
- assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
- assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
- evenAndOddSink.getSortedResult());
- assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
- allSink.getSortedResult());
- }
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index fbe6616..bc9980f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -32,8 +32,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -44,12 +44,12 @@ import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
-import org.apache.flink.test.streaming.runtime.util.EvenOddOutputSelector;
import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
import org.apache.flink.test.streaming.runtime.util.ReceiveCheckNoOpSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.OutputTag;
import org.junit.Test;
import org.slf4j.Logger;
@@ -223,11 +223,25 @@ public class IterateITCase extends AbstractTestBase {
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
- SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+ OutputTag<Integer> even = new OutputTag<Integer>("even") {};
+ OutputTag<Integer> odd = new OutputTag<Integer>("odd") {};
+ SingleOutputStreamOperator<Object> source3 = env.fromElements(1, 2, 3, 4, 5)
.map(noOpIntMap).name("EvenOddSourceMap")
- .split(new EvenOddOutputSelector());
+ .process(new ProcessFunction<Integer, Object>() {
+ @Override
+ public void processElement(
+ Integer value,
+ Context ctx,
+ Collector<Object> out) throws Exception {
+ if (value % 2 == 0) {
+ ctx.output(even, value);
+ } else {
+ ctx.output(odd, value);
+ }
+ }
+ });
- iter1.closeWith(source3.select("even").union(
+ iter1.closeWith(source3.getSideOutput(even).union(
head1.rebalance().map(noOpIntMap).broadcast(), head2.shuffle()));
StreamGraph graph = env.getStreamGraph();
@@ -263,7 +277,6 @@ public class IterateITCase extends AbstractTestBase {
if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("EvenOddSourceMap")) {
assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
- assertTrue(edge.getSelectedNames().contains("even"));
}
}
@@ -307,13 +320,27 @@ public class IterateITCase extends AbstractTestBase {
.addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
- SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+ OutputTag<Integer> even = new OutputTag<Integer>("even") {};
+ OutputTag<Integer> odd = new OutputTag<Integer>("odd") {};
+ SingleOutputStreamOperator<Object> source3 = env.fromElements(1, 2, 3, 4, 5)
.map(noOpIntMap)
.name("split")
- .split(new EvenOddOutputSelector());
+ .process(new ProcessFunction<Integer, Object>() {
+ @Override
+ public void processElement(
+ Integer value,
+ Context ctx,
+ Collector<Object> out) throws Exception {
+ if (value % 2 == 0) {
+ ctx.output(even, value);
+ } else {
+ ctx.output(odd, value);
+ }
+ }
+ });
iter1.closeWith(
- source3.select("even").union(
+ source3.getSideOutput(even).union(
head1.map(noOpIntMap).name("bc").broadcast(),
head2.map(noOpIntMap).shuffle()));
@@ -345,7 +372,6 @@ public class IterateITCase extends AbstractTestBase {
String tailName = graph.getSourceVertex(edge).getOperatorName();
if (tailName.equals("split")) {
assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
- assertTrue(edge.getSelectedNames().contains("even"));
} else if (tailName.equals("bc")) {
assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
} else if (tailName.equals("shuffle")) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java
deleted file mode 100644
index 0a509b9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.streaming.runtime;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Integration tests for a streaming {@link OutputSelector}.
- */
-public class OutputSplitterITCase extends AbstractTestBase {
-
- private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
-
- @SuppressWarnings("unchecked")
- @Test
- public void testOnMergedDataStream() throws Exception {
- TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
- TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setBufferTimeout(1);
-
- DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
- DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
-
- d1 = d1.union(d2);
-
- d1.split(new OutputSelector<Integer>() {
- private static final long serialVersionUID = 8354166915727490130L;
-
- @Override
- public Iterable<String> select(Integer value) {
- List<String> s = new ArrayList<String>();
- if (value > 4) {
- s.add(">");
- } else {
- s.add("<");
- }
- return s;
- }
- }).select(">").addSink(splitterResultSink1);
-
- d1.split(new OutputSelector<Integer>() {
- private static final long serialVersionUID = -6822487543355994807L;
-
- @Override
- public Iterable<String> select(Integer value) {
- List<String> s = new ArrayList<String>();
- if (value % 3 == 0) {
- s.add("yes");
- } else {
- s.add("no");
- }
- return s;
- }
- }).select("yes").addSink(splitterResultSink2);
- env.execute();
-
- expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9));
- assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
-
- expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9));
- assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
- }
-
- @Test
- public void testOnSingleDataStream() throws Exception {
- TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
- TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setBufferTimeout(1);
-
- DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-
- ds.split(new OutputSelector<Integer>() {
- private static final long serialVersionUID = 2524335410904414121L;
-
- @Override
- public Iterable<String> select(Integer value) {
- List<String> s = new ArrayList<String>();
- if (value % 2 == 0) {
- s.add("even");
- } else {
- s.add("odd");
- }
- return s;
- }
- }).select("even").addSink(splitterResultSink1);
-
- ds.split(new OutputSelector<Integer>() {
-
- private static final long serialVersionUID = -511693919586034092L;
-
- @Override
- public Iterable<String> select(Integer value) {
- List<String> s = new ArrayList<String>();
- if (value % 4 == 0) {
- s.add("yes");
- } else {
- s.add("no");
- }
- return s;
- }
- }).select("yes").addSink(splitterResultSink2);
- env.execute();
-
- expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8));
- assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
-
- expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0, 4, 8));
- assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
- }
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
deleted file mode 100644
index 804c19b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.streaming.runtime.util;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import java.util.Collections;
-
-/**
- * {@link OutputSelector} mapping integers to "even" and "odd" streams.
- */
-public class EvenOddOutputSelector implements OutputSelector<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterable<String> select(Integer value) {
- return value % 2 == 0 ? Collections.singleton("even") : Collections.singleton("odd");
- }
-}