You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/11 08:52:01 UTC

[flink] branch master updated: [FLINK-11084][datastream] Forbid using two consecutive split transformations

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e0efabe  [FLINK-11084][datastream] Forbid using two consecutive split transformations
e0efabe is described below

commit e0efabe8884f22b4a1c7ab9df3274b3fca03dcfb
Author: Shimin Yang <ya...@youzan.com>
AuthorDate: Thu Dec 6 14:39:46 2018 +0800

    [FLINK-11084][datastream] Forbid using two consecutive split transformations
    
    This closes #7258
---
 .../flink/streaming/api/datastream/DataStream.java |   2 +
 .../streaming/api/datastream/SplitStream.java      |   1 +
 .../streaming/api/graph/StreamGraphGenerator.java  |  18 ++++
 .../apache/flink/streaming/api/DataStreamTest.java | 100 +++++++++++++++++++--
 .../flink/streaming/api/scala/DataStream.scala     |   6 ++
 5 files changed, 121 insertions(+), 6 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 271d9be..a0af5f1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -234,7 +234,9 @@ public class DataStream<T> {
 	 *            {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
 	 *            for directing the tuples.
 	 * @return The {@link SplitStream}
+	 * @deprecated Please use side ouput instead.
 	 */
+	@Deprecated
 	public SplitStream<T> split(OutputSelector<T> outputSelector) {
 		return new SplitStream<>(this, clean(outputSelector));
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
index 0beae32..7f28dc7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
@@ -33,6 +33,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
  * @param <OUT> The type of the elements in the Stream
  */
 
+@Deprecated
 @PublicEvolving
 public class SplitStream<OUT> extends DataStream<OUT> {
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 2c4ae4a..8240003 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -258,6 +258,8 @@ public class StreamGraphGenerator {
 		StreamTransformation<T> input = split.getInput();
 		Collection<Integer> resultIds = transform(input);
 
+		validateSplitTransformation(input);
+
 		// the recursive transform call might have transformed this already
 		if (alreadyTransformed.containsKey(split)) {
 			return alreadyTransformed.get(split);
@@ -643,4 +645,20 @@ public class StreamGraphGenerator {
 			return inputGroup == null ? "default" : inputGroup;
 		}
 	}
+
+	private <T> void validateSplitTransformation(StreamTransformation<T> input) {
+		if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
+			throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+		} else if (input instanceof SideOutputTransformation) {
+			throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
+		} else if (input instanceof UnionTransformation) {
+			for (StreamTransformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
+				validateSplitTransformation(transformation);
+			}
+		} else if (input instanceof PartitionTransformation) {
+			validateSplitTransformation(((PartitionTransformation) input).getInput());
+		} else {
+			return;
+		}
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index aa6774d..215e548 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -79,6 +79,7 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.core.StringStartsWith;
@@ -952,12 +953,7 @@ public class DataStreamTest extends TestLogger {
 			fail(e.getMessage());
 		}
 
-		OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() {
-			@Override
-			public Iterable<String> select(Integer value) {
-				return null;
-			}
-		};
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
 
 		SplitStream<Integer> split = unionFilter.split(outputSelector);
 		split.select("dummy").addSink(new DiscardingSink<Integer>());
@@ -1092,6 +1088,91 @@ public class DataStreamTest extends TestLogger {
 	}
 
 	/////////////////////////////////////////////////////////////
+	// Split testing
+	/////////////////////////////////////////////////////////////
+
+	@Test
+	public void testConsecutiveSplitRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.split(outputSelector).split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	@Test
+	public void testSplitAfterSideOutputRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputTag<Integer> outputTag = new OutputTag<Integer>("dummy"){};
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.getSideOutput(outputTag).split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	@Test
+	public void testSelectBetweenConsecutiveSplitRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.split(outputSelector).select("dummy").split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	@Test
+	public void testUnionBetweenConsecutiveSplitRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.split(outputSelector).select("dummy").union(src.map(x -> x)).split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	@Test
+	public void testKeybyBetweenConsecutiveSplitRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.split(outputSelector).select("dummy").keyBy(x -> x).split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	/////////////////////////////////////////////////////////////
 	// KeyBy testing
 	/////////////////////////////////////////////////////////////
 
@@ -1427,4 +1508,11 @@ public class DataStreamTest extends TestLogger {
 			return i;
 		}
 	}
+
+	private class DummyOutputSelector<Integer> implements OutputSelector<Integer> {
+		@Override
+		public Iterable<String> select(Integer value) {
+			return null;
+		}
+	}
 }
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 23d2165..15dca2c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -915,13 +915,19 @@ class DataStream[T](stream: JavaStream[T]) {
    * Operator used for directing tuples to specific named outputs using an
    * OutputSelector. Calling this method on an operator creates a new
    * [[SplitStream]].
+   *
+   * @deprecated Please use side output instead.
    */
+  @deprecated
   def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))
 
   /**
    * Creates a new [[SplitStream]] that contains only the elements satisfying the
    *  given output selector predicate.
+   *
+   * @deprecated Please use side output instead.
    */
+  @deprecated
   def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
     if (fun == null) {
       throw new NullPointerException("OutputSelector must not be null.")