You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/11 08:53:36 UTC

[GitHub] dawidwys closed pull request #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation

dawidwys closed pull request #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation
URL: https://github.com/apache/flink/pull/7258
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 271d9bec33f..a0af5f1a8c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -234,7 +234,9 @@ public ExecutionConfig getExecutionConfig() {
 	 *            {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
 	 *            for directing the tuples.
 	 * @return The {@link SplitStream}
+	 * @deprecated Please use side ouput instead.
 	 */
+	@Deprecated
 	public SplitStream<T> split(OutputSelector<T> outputSelector) {
 		return new SplitStream<>(this, clean(outputSelector));
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
index 0beae32435d..7f28dc7b497 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
@@ -33,6 +33,7 @@
  * @param <OUT> The type of the elements in the Stream
  */
 
+@Deprecated
 @PublicEvolving
 public class SplitStream<OUT> extends DataStream<OUT> {
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 2c4ae4a993d..82400036c3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -258,6 +258,8 @@ private StreamGraph generateInternal(List<StreamTransformation<?>> transformatio
 		StreamTransformation<T> input = split.getInput();
 		Collection<Integer> resultIds = transform(input);
 
+		validateSplitTransformation(input);
+
 		// the recursive transform call might have transformed this already
 		if (alreadyTransformed.containsKey(split)) {
 			return alreadyTransformed.get(split);
@@ -643,4 +645,20 @@ private String determineSlotSharingGroup(String specifiedGroup, Collection<Integ
 			return inputGroup == null ? "default" : inputGroup;
 		}
 	}
+
+	private <T> void validateSplitTransformation(StreamTransformation<T> input) {
+		if (input instanceof SelectTransformation || input instanceof SplitTransformation) {
+			throw new IllegalStateException("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+		} else if (input instanceof SideOutputTransformation) {
+			throw new IllegalStateException("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
+		} else if (input instanceof UnionTransformation) {
+			for (StreamTransformation<T> transformation : ((UnionTransformation<T>) input).getInputs()) {
+				validateSplitTransformation(transformation);
+			}
+		} else if (input instanceof PartitionTransformation) {
+			validateSplitTransformation(((PartitionTransformation) input).getInput());
+		} else {
+			return;
+		}
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index aa6774dab8f..215e5485648 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -79,6 +79,7 @@
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.core.StringStartsWith;
@@ -952,12 +953,7 @@ public boolean filter(Integer value) throws Exception {
 			fail(e.getMessage());
 		}
 
-		OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() {
-			@Override
-			public Iterable<String> select(Integer value) {
-				return null;
-			}
-		};
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
 
 		SplitStream<Integer> split = unionFilter.split(outputSelector);
 		split.select("dummy").addSink(new DiscardingSink<Integer>());
@@ -1091,6 +1087,91 @@ public void testChannelSelectors() {
 		assertTrue(globalPartitioner instanceof GlobalPartitioner);
 	}
 
+	/////////////////////////////////////////////////////////////
+	// Split testing
+	/////////////////////////////////////////////////////////////
+
+	@Test
+	public void testConsecutiveSplitRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.split(outputSelector).split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	@Test
+	public void testSplitAfterSideOutputRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputTag<Integer> outputTag = new OutputTag<Integer>("dummy"){};
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.getSideOutput(outputTag).split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	@Test
+	public void testSelectBetweenConsecutiveSplitRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.split(outputSelector).select("dummy").split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	@Test
+	public void testUnionBetweenConsecutiveSplitRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.split(outputSelector).select("dummy").union(src.map(x -> x)).split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
+	@Test
+	public void testKeybyBetweenConsecutiveSplitRejection() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<Integer> src = env.fromElements(0, 0);
+
+		OutputSelector<Integer> outputSelector = new DummyOutputSelector<>();
+
+		src.split(outputSelector).select("dummy").keyBy(x -> x).split(outputSelector).addSink(new DiscardingSink<>());
+
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
+
+		env.getStreamGraph();
+	}
+
 	/////////////////////////////////////////////////////////////
 	// KeyBy testing
 	/////////////////////////////////////////////////////////////
@@ -1427,4 +1508,11 @@ public int getI() {
 			return i;
 		}
 	}
+
+	private class DummyOutputSelector<Integer> implements OutputSelector<Integer> {
+		@Override
+		public Iterable<String> select(Integer value) {
+			return null;
+		}
+	}
 }
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 23d216549f8..15dca2c9c07 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -915,13 +915,19 @@ class DataStream[T](stream: JavaStream[T]) {
    * Operator used for directing tuples to specific named outputs using an
    * OutputSelector. Calling this method on an operator creates a new
    * [[SplitStream]].
+   *
+   * @deprecated Please use side output instead.
    */
+  @deprecated
   def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))
 
   /**
    * Creates a new [[SplitStream]] that contains only the elements satisfying the
    *  given output selector predicate.
+   *
+   * @deprecated Please use side output instead.
    */
+  @deprecated
   def split(fun: T => TraversableOnce[String]): SplitStream[T] = {
     if (fun == null) {
       throw new NullPointerException("OutputSelector must not be null.")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services