You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/10 11:48:40 UTC
[06/17] flink git commit: [FLINK-9141][datastream] Fail early when
using both split and side-outputs
[FLINK-9141][datastream] Fail early when using both split and side-outputs
This closes #5836.
This closes #5479.
This closes #4893.
This closes #4809.
This closes #4621.
This closes #3915.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b32ad44f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b32ad44f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b32ad44f
Branch: refs/heads/master
Commit: b32ad44f7b6d9053aefa3c9db7a9328b1571b19c
Parents: e7b7934
Author: zentol <ch...@apache.org>
Authored: Wed Apr 11 11:13:52 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 10 08:26:13 2018 +0200
----------------------------------------------------------------------
.../datastream/SingleOutputStreamOperator.java | 19 +++++
.../api/datastream/SplitSideOutputTest.java | 78 ++++++++++++++++++++
2 files changed, 97 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b32ad44f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index cc57714..ccd1ac1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
@@ -54,6 +55,8 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
*/
private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>();
+ private boolean wasSplitApplied = false;
+
protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
super(environment, transformation);
}
@@ -376,6 +379,17 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
return this;
}
+ @Override
+ public SplitStream<T> split(OutputSelector<T> outputSelector) {
+ if (requestedSideOutputs.isEmpty()) {
+ wasSplitApplied = true;
+ return super.split(outputSelector);
+ } else {
+ throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
+ "As a work-around, please add a no-op map function before the split() call.");
+ }
+ }
+
/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
@@ -383,6 +397,11 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
+ if (wasSplitApplied) {
+ throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
+ "As a work-around, please add a no-op map function before the split() call.");
+ }
+
sideOutputTag = clean(requireNonNull(sideOutputTag));
// make a defensive copy
http://git-wip-us.apache.org/repos/asf/flink/blob/b32ad44f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
new file mode 100644
index 0000000..8f33e19
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}.
+ */
+public class SplitSideOutputTest {
+
+ private static final OutputTag<String> outputTag = new OutputTag<String>("outputTag") {};
+
+ @Test
+ public void testSideOutputAfterSelectIsForbidden() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ SingleOutputStreamOperator<String> processInput = env.fromElements("foo")
+ .process(new DummyProcessFunction());
+
+ processInput.split(Collections::singleton);
+
+ try {
+ processInput.getSideOutput(outputTag);
+ Assert.fail("Should have failed early with an exception.");
+ } catch (UnsupportedOperationException expected){
+ // expected
+ }
+ }
+
+ @Test
+ public void testSelectAfterSideOutputIsForbidden() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ SingleOutputStreamOperator<String> processInput = env.fromElements("foo")
+ .process(new DummyProcessFunction());
+
+ processInput.getSideOutput(outputTag);
+
+ try {
+ processInput.split(Collections::singleton);
+ Assert.fail("Should have failed early with an exception.");
+ } catch (UnsupportedOperationException expected){
+ // expected
+ }
+ }
+
+ private static final class DummyProcessFunction extends ProcessFunction<String, String> {
+
+ @Override
+ public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
+ }
+ }
+}