You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/11 08:53:46 UTC
[flink] branch release-1.6 updated: [FLINK-11084][datastream]
Forbid using two consecutive split transformations
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 0d3125d [FLINK-11084][datastream] Forbid using two consecutive split transformations
0d3125d is described below
commit 0d3125d6855d91cf2e66b1b11d0bf016c16d32a1
Author: Shimin Yang <ya...@youzan.com>
AuthorDate: Thu Dec 6 14:39:46 2018 +0800
[FLINK-11084][datastream] Forbid using two consecutive split transformations
This closes #7258
---
.../flink/streaming/api/datastream/DataStream.java | 2 +
.../streaming/api/datastream/SplitStream.java | 1 +
.../streaming/api/graph/StreamGraphGenerator.java | 18 ++++
.../apache/flink/streaming/api/DataStreamTest.java | 100 +++++++++++++++++++--
.../flink/streaming/api/scala/DataStream.scala | 6 ++
5 files changed, 121 insertions(+), 6 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8e24ad7..a8486b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -234,7 +234,9 @@ public class DataStream<T> {
* {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
* for directing the tuples.
* @return The {@link SplitStream}
+ * @deprecated Please use side ouput instead.
*/
+ @Deprecated
public SplitStream<T> split(OutputSelector<T> outputSelector) {
return new SplitStream<>(this, clean(outputSelector));
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
index 0beae32..7f28dc7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
@@ -33,6 +33,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
* @param <OUT> The type of the elements in the Stream
*/
+@Deprecated
@PublicEvolving
public class SplitStream<OUT> extends DataStream<OUT> {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 2c4ae4a..8240003 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -258,6 +258,8 @@ public class StreamGraphGenerator {
StreamTransformation<T> input = split.getInput();
Collection<Integer> resultIds = transform(input);
+ validateSplitTransformation(input);
+
// the recursive transform call might have transformed this already
if (alreadyTransformed.containsKey(split)) {
return alreadyTransformed.get(split);
@@ -643,4 +645,20 @@ public class StreamGraphGenerator {
return inputGroup == null ? "default" : inputGroup;
}
}
+
+ private <T> void validateSplitTransformation(StreamTransformation<T> input) {
+ if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
+ throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+ } else if (input instanceof SideOutputTransformation) {
+ throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
+ } else if (input instanceof UnionTransformation) {
+ for (StreamTransformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
+ validateSplitTransformation(transformation);
+ }
+ } else if (input instanceof PartitionTransformation) {
+ validateSplitTransformation(((PartitionTransformation) input).getInput());
+ } else {
+ return;
+ }
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 4d2d6e1..356e3f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -79,6 +79,7 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.StringStartsWith;
@@ -952,12 +953,7 @@ public class DataStreamTest extends TestLogger {
fail(e.getMessage());
}
- OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() {
- @Override
- public Iterable<String> select(Integer value) {
- return null;
- }
- };
+ OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
SplitStream<Integer> split = unionFilter.split(outputSelector);
split.select("dummy").addSink(new DiscardingSink<Integer>());
@@ -1088,6 +1084,91 @@ public class DataStreamTest extends TestLogger {
}
/////////////////////////////////////////////////////////////
+ // Split testing
+ /////////////////////////////////////////////////////////////
+
+ @Test
+ public void testConsecutiveSplitRejection() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+ OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+ src.split(outputSelector).split(outputSelector).addSink(new DiscardingSink<>());
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+ env.getStreamGraph();
+ }
+
+ @Test
+ public void testSplitAfterSideOutputRejection() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+ OutputTag<Integer> outputTag = new OutputTag<Integer>("dummy"){};
+ OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+ src.getSideOutput(outputTag).split(outputSelector).addSink(new DiscardingSink<>());
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
+
+ env.getStreamGraph();
+ }
+
+ @Test
+ public void testSelectBetweenConsecutiveSplitRejection() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+ OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+ src.split(outputSelector).select("dummy").split(outputSelector).addSink(new DiscardingSink<>());
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+ env.getStreamGraph();
+ }
+
+ @Test
+ public void testUnionBetweenConsecutiveSplitRejection() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+ OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+ src.split(outputSelector).select("dummy").union(src.map(x -> x)).split(outputSelector).addSink(new DiscardingSink<>());
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+ env.getStreamGraph();
+ }
+
+ @Test
+ public void testKeybyBetweenConsecutiveSplitRejection() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+ OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+ src.split(outputSelector).select("dummy").keyBy(x -> x).split(outputSelector).addSink(new DiscardingSink<>());
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+ env.getStreamGraph();
+ }
+
+ /////////////////////////////////////////////////////////////
// KeyBy testing
/////////////////////////////////////////////////////////////
@@ -1423,4 +1504,11 @@ public class DataStreamTest extends TestLogger {
return i;
}
}
+
+ private class DummyOutputSelector<Integer> implements OutputSelector<Integer> {
+ @Override
+ public Iterable<String> select(Integer value) {
+ return null;
+ }
+ }
}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3a88829..046b558 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -903,13 +903,19 @@ class DataStream[T](stream: JavaStream[T]) {
* Operator used for directing tuples to specific named outputs using an
* OutputSelector. Calling this method on an operator creates a new
* [[SplitStream]].
+ *
+ * @deprecated Please use side output instead.
*/
+ @deprecated
def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))
/**
* Creates a new [[SplitStream]] that contains only the elements satisfying the
* given output selector predicate.
+ *
+ * @deprecated Please use side output instead.
*/
+ @deprecated
def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
if (fun == null) {
throw new NullPointerException("OutputSelector must not be null.")