You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/10 11:48:40 UTC

[06/17] flink git commit: [FLINK-9141][datastream] Fail early when using both split and side-outputs

[FLINK-9141][datastream] Fail early when using both split and side-outputs

This closes #5836.
This closes #5479.
This closes #4893.
This closes #4809.
This closes #4621.
This closes #3915.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b32ad44f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b32ad44f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b32ad44f

Branch: refs/heads/master
Commit: b32ad44f7b6d9053aefa3c9db7a9328b1571b19c
Parents: e7b7934
Author: zentol <ch...@apache.org>
Authored: Wed Apr 11 11:13:52 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 10 08:26:13 2018 +0200

----------------------------------------------------------------------
 .../datastream/SingleOutputStreamOperator.java  | 19 +++++
 .../api/datastream/SplitSideOutputTest.java     | 78 ++++++++++++++++++++
 2 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b32ad44f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index cc57714..ccd1ac1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
@@ -54,6 +55,8 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	 */
 	private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>();
 
+	private boolean wasSplitApplied = false;
+
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
 		super(environment, transformation);
 	}
@@ -376,6 +379,17 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 		return this;
 	}
 
+	@Override
+	public SplitStream<T> split(OutputSelector<T> outputSelector) {
+		if (requestedSideOutputs.isEmpty()) {
+			wasSplitApplied = true;
+			return super.split(outputSelector);
+		} else {
+			throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
+				"As a work-around, please add a no-op map function before the split() call.");
+		}
+	}
+
 	/**
 	 * Gets the {@link DataStream} that contains the elements that are emitted from an operation
 	 * into the side output with the given {@link OutputTag}.
@@ -383,6 +397,11 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	 * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
 	 */
 	public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
+		if (wasSplitApplied) {
+			throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
+				"As a work-around, please add a no-op map function before the split() call.");
+		}
+
 		sideOutputTag = clean(requireNonNull(sideOutputTag));
 
 		// make a defensive copy

http://git-wip-us.apache.org/repos/asf/flink/blob/b32ad44f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
new file mode 100644
index 0000000..8f33e19
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}.
+ */
+public class SplitSideOutputTest {
+
+	private static final OutputTag<String> outputTag = new OutputTag<String>("outputTag") {};
+
+	@Test
+	public void testSideOutputAfterSelectIsForbidden() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SingleOutputStreamOperator<String> processInput = env.fromElements("foo")
+			.process(new DummyProcessFunction());
+
+		processInput.split(Collections::singleton);
+
+		try {
+			processInput.getSideOutput(outputTag);
+			Assert.fail("Should have failed early with an exception.");
+		} catch (UnsupportedOperationException expected){
+			// expected
+		}
+	}
+
+	@Test
+	public void testSelectAfterSideOutputIsForbidden() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SingleOutputStreamOperator<String> processInput = env.fromElements("foo")
+			.process(new DummyProcessFunction());
+
+		processInput.getSideOutput(outputTag);
+
+		try {
+			processInput.split(Collections::singleton);
+			Assert.fail("Should have failed early with an exception.");
+		} catch (UnsupportedOperationException expected){
+			// expected
+		}
+	}
+
+	private static final class DummyProcessFunction extends ProcessFunction<String, String> {
+
+		@Override
+		public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
+		}
+	}
+}