You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:43 UTC
[04/11] flink git commit: [FLINK-4460] Add support for side outputs
[FLINK-4460] Add support for side outputs
This does not yet allow users to emit to side outputs in user functions.
Only operators (StreamOperator) can emit to side outputs. A side output
can be retrieved on a SingleOutputStreamOperator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e134d275
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e134d275
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e134d275
Branch: refs/heads/master
Commit: e134d27589ead89882d94969edeeb171ee4433b1
Parents: f31a55e
Author: Chen Qin <qi...@gmail.com>
Authored: Fri Oct 21 12:38:04 2016 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/typeinfo/TypeHint.java | 9 +-
.../java/org/apache/flink/util/OutputTag.java | 98 ++++++++++++++++++++
.../api/collector/selector/DirectedOutput.java | 6 ++
.../datastream/SingleOutputStreamOperator.java | 14 +++
.../flink/streaming/api/graph/StreamConfig.java | 18 +++-
.../flink/streaming/api/graph/StreamEdge.java | 17 +++-
.../flink/streaming/api/graph/StreamGraph.java | 46 +++++++--
.../api/graph/StreamGraphGenerator.java | 34 ++++++-
.../api/graph/StreamingJobGraphGenerator.java | 20 ++++
.../api/operators/AbstractStreamOperator.java | 7 ++
.../flink/streaming/api/operators/Output.java | 9 ++
.../SideOutputTransformation.java | 73 +++++++++++++++
.../runtime/io/RecordWriterOutput.java | 30 +++++-
.../streaming/runtime/tasks/OperatorChain.java | 42 +++++++--
.../runtime/tasks/StreamIterationTail.java | 7 ++
.../runtime/tasks/OneInputStreamTaskTest.java | 6 +-
.../runtime/tasks/StreamTaskTestHarness.java | 3 +-
.../tasks/TwoInputStreamTaskTestHarness.java | 6 +-
.../util/AbstractStreamOperatorTestHarness.java | 32 ++++++-
.../flink/streaming/util/CollectorOutput.java | 6 ++
.../apache/flink/streaming/util/MockOutput.java | 6 ++
.../flink/streaming/api/scala/DataStream.scala | 8 +-
22 files changed, 467 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
index 975d6e3..0f40710 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
@@ -46,7 +46,14 @@ public abstract class TypeHint<T> {
public TypeHint() {
this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0);
}
-
+
+ /**
+ * Creates a hint for the generic type in the class signature.
+ */
+ public TypeHint(Class<?> baseClass, Object instance, int genericParameterPos) {
+ this.typeInfo = TypeExtractor.createTypeInfo(instance, baseClass, instance.getClass(), genericParameterPos);
+ }
+
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/OutputTag.java b/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
new file mode 100644
index 0000000..eda3ab8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side outputs
+ * of an operator.
+ *
+ * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
+ * }</pre>
+ *
+ * @param <T> the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag<T> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String id;
+
+ private transient TypeInformation<T> typeInfo;
+
+ /**
+ * Creates a new named {@code OutputTag} with the given id.
+ *
+ * @param id The id of the created {@code OutputTag}.
+ */
+ public OutputTag(String id) {
+ this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+
+ try {
+ TypeHint<T> typeHint = new TypeHint<T>(OutputTag.class, this, 0) {};
+ this.typeInfo = typeHint.getTypeInfo();
+ } catch (InvalidTypesException e) {
+ throw new InvalidTypesException("Could not determine TypeInformation for generic " +
+ "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e);
+ }
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ typeInfo = null;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public TypeInformation<T> getTypeInfo() {
+ return typeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof OutputTag
+ && ((OutputTag) obj).id.equals(this.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "OutputTag(" + getTypeInfo() + ", " + id + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
index 24f1c63..dabe804 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.util.XORShiftRandom;
@@ -139,6 +140,11 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
}
@Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ throw new UnsupportedOperationException("Cannot use split/select with side outputs.");
+ }
+
+ @Override
public void close() {
for (Output<StreamRecord<OUT>> out : allOutputs) {
out.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index f1d5e3a..e61b39a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -27,6 +28,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Preconditions;
@@ -416,4 +418,16 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+ /**
+ * Gets the {@link DataStream} that contains the elements that are emitted from an operation
+ * into the side output with the given {@link OutputTag}.
+ *
+ * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
+ */
+ public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag){
+ sideOutputTag = clean(sideOutputTag);
+ SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), requireNonNull(sideOutputTag));
+ return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index dd4c55c..93a6387 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
@Internal
public class StreamConfig implements Serializable {
@@ -63,6 +65,7 @@ public class StreamConfig implements Serializable {
private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
+ private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
private static final String ITERATON_WAIT = "iterationWait";
private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
private static final String EDGES_IN_ORDER = "edgesInOrder";
@@ -139,6 +142,10 @@ public class StreamConfig implements Serializable {
public void setTypeSerializerOut(TypeSerializer<?> serializer) {
setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
}
+
+ public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer);
+ }
public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
try {
@@ -155,7 +162,7 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not instantiate serializer.", e);
}
}
-
+
public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
try {
return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
@@ -164,6 +171,15 @@ public class StreamConfig implements Serializable {
}
}
+ public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
+ Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
+ try {
+ return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
+ } catch (Exception e) {
+ throw new StreamTaskException("Could not instantiate serializer.", e);
+ }
+ }
+
private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
try {
InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index e2bcac1..8e1c361 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.List;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
/**
@@ -48,15 +49,25 @@ public class StreamEdge implements Serializable {
* output selection).
*/
private final List<String> selectedNames;
+
+ /**
+ * The side-output tag (if any) of this {@link StreamEdge}.
+ */
+ private final OutputTag outputTag;
+
+ /**
+ * The {@link StreamPartitioner} on this {@link StreamEdge}.
+ */
private StreamPartitioner<?> outputPartitioner;
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
- List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
+ List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
+ this.outputTag = outputTag;
this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
+ "_" + outputPartitioner;
@@ -86,6 +97,8 @@ public class StreamEdge implements Serializable {
return selectedNames;
}
+ public OutputTag getOutputTag() {return this.outputTag;}
+
public StreamPartitioner<?> getPartitioner() {
return outputPartitioner;
}
@@ -117,6 +130,6 @@ public class StreamEdge implements Serializable {
public String toString() {
return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
+ ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
- + ')';
+ + ", outputTag=" + outputTag + ')';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index c8d5340..e792a5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -32,6 +32,7 @@ import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -85,6 +86,7 @@ public class StreamGraph extends StreamingPlan {
private Set<Integer> sources;
private Set<Integer> sinks;
private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
+ private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;
protected Map<Integer, String> vertexIDtoBrokerID;
@@ -110,6 +112,7 @@ public class StreamGraph extends StreamingPlan {
public void clear() {
streamNodes = new HashMap<>();
virtualSelectNodes = new HashMap<>();
+ virtualSideOutputNodes = new HashMap<>();
virtualPartitionNodes = new HashMap<>();
vertexIDtoBrokerID = new HashMap<>();
vertexIDtoLoopTimeout = new HashMap<>();
@@ -294,6 +297,23 @@ public class StreamGraph extends StreamingPlan {
}
/**
+ * Adds a new virtual node that is used to connect a downstream vertex to only the outputs with
+ * the selected side-output {@link OutputTag}.
+ *
+ * @param originalId ID of the node that should be connected to.
+ * @param virtualId ID of the virtual node.
+ * @param outputTag The selected side-output {@code OutputTag}.
+ */
+ public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, OutputTag outputTag) {
+
+ if (virtualSideOutputNodes.containsKey(virtualId)) {
+ throw new IllegalStateException("Already has virtual output node with id " + virtualId);
+ }
+
+ virtualSideOutputNodes.put(virtualId, new Tuple2<>(originalId, outputTag));
+ }
+
+ /**
* Adds a new virtual node that is used to connect a downstream vertex to an input with a certain
* partitioning.
*
@@ -318,7 +338,10 @@ public class StreamGraph extends StreamingPlan {
* Determines the slot sharing group of an operation across virtual nodes.
*/
public String getSlotSharingGroup(Integer id) {
- if (virtualSelectNodes.containsKey(id)) {
+ if (virtualSideOutputNodes.containsKey(id)) {
+ Integer mappedId = virtualSideOutputNodes.get(id).f0;
+ return getSlotSharingGroup(mappedId);
+ } else if (virtualSelectNodes.containsKey(id)) {
Integer mappedId = virtualSelectNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else if (virtualPartitionNodes.containsKey(id)) {
@@ -335,7 +358,7 @@ public class StreamGraph extends StreamingPlan {
downStreamVertexID,
typeNumber,
null,
- new ArrayList<String>());
+ new ArrayList<String>(), null);
}
@@ -343,24 +366,31 @@ public class StreamGraph extends StreamingPlan {
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
- List<String> outputNames) {
+ List<String> outputNames,
+ OutputTag outputTag) {
-
- if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+ if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
+ int virtualId = upStreamVertexID;
+ upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
+ if (outputTag == null) {
+ outputTag = virtualSideOutputNodes.get(virtualId).f1;
+ }
+ addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
+ } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
+ addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
+ addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
} else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
@@ -382,7 +412,7 @@ public class StreamGraph extends StreamingPlan {
}
}
- StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
+ StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 2defbef..df10ae4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.SelectTransformation;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.SplitTransformation;
@@ -184,6 +185,8 @@ public class StreamGraphGenerator {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
+ } else if (transform instanceof SideOutputTransformation<?>) {
+ transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
@@ -302,6 +305,35 @@ public class StreamGraphGenerator {
}
/**
+ * Transforms a {@code SideOutputTransformation}.
+ *
+ * <p>
+ * For this we create a virtual node in the {@code StreamGraph} that holds the side-output
+ * {@link org.apache.flink.util.OutputTag}.
+ *
+ * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
+ */
+ private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
+ StreamTransformation<?> input = sideOutput.getInput();
+ Collection<Integer> resultIds = transform(input);
+
+
+ // the recursive transform might have already transformed this
+ if (alreadyTransformed.containsKey(sideOutput)) {
+ return alreadyTransformed.get(sideOutput);
+ }
+
+ List<Integer> virtualResultIds = new ArrayList<>();
+
+ for (int inputId : resultIds) {
+ int virtualId = StreamTransformation.getNewNodeId();
+ streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
+ virtualResultIds.add(virtualId);
+ }
+ return virtualResultIds;
+ }
+
+ /**
* Transforms a {@code FeedbackTransformation}.
*
* <p>
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 60f8faa..9cf0432 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -374,6 +374,25 @@ public class StreamingJobGraphGenerator {
config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
config.setTypeSerializerOut(vertex.getTypeSerializerOut());
+ // iterate edges, find sideOutput edges create and save serializers for each outputTag type
+ for (StreamEdge edge : chainableOutputs) {
+ if (edge.getOutputTag() != null) {
+ config.setTypeSerializerSideOut(
+ edge.getOutputTag(),
+ edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig())
+ );
+ }
+ }
+ for (StreamEdge edge : nonChainableOutputs) {
+ if (edge.getOutputTag() != null) {
+ config.setTypeSerializerSideOut(
+ edge.getOutputTag(),
+ edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig())
+ );
+ }
+ }
+
+
config.setStreamOperator(vertex.getOperator());
config.setOutputSelectors(vertex.getOutputSelectors());
@@ -469,6 +488,7 @@ public class StreamingJobGraphGenerator {
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
+ && edge.getOutputTag() == null // disable chaining for side outputs
&& streamGraph.isChainingEnabled();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index b6f86a8..e40a59b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -862,6 +863,12 @@ public abstract class AbstractStreamOperator<OUT>
}
@Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ numRecordsOut.inc();
+ output.collect(outputTag, record);
+ }
+
+ @Override
public void close() {
output.close();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index ec2409e..eb10d8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -20,7 +20,9 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
/**
* A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
@@ -41,5 +43,12 @@ public interface Output<T> extends Collector<T> {
*/
void emitWatermark(Watermark mark);
+ /**
+ * Emits a record the the side output identified by the given {@link OutputTag}.
+ *
+ * @param record The record to collect.
+ */
+ <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record);
+
void emitLatencyMarker(LatencyMarker latencyMarker);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
new file mode 100644
index 0000000..a90b40d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This transformation represents a selection of a side output of an upstream operation with a
+ * given {@link OutputTag}.
+ *
+ * <p>This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code SideOutputTransformation}
+ */
+public class SideOutputTransformation<T> extends StreamTransformation<T> {
+ private final StreamTransformation<?> input;
+
+ private final OutputTag<T> tag;
+
+ public SideOutputTransformation(StreamTransformation<?> input, final OutputTag<T> tag) {
+ super("SideOutput", tag.getTypeInfo(), requireNonNull(input).getParallelism());
+ this.input = input;
+ this.tag = requireNonNull(tag);
+ }
+
+ /**
+ * Returns the input {@code StreamTransformation}.
+ */
+ public StreamTransformation<?> getInput() {
+ return input;
+ }
+
+ public OutputTag<T> getOutputTag() {
+ return tag;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input.getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public final void setChainingStrategy(ChainingStrategy strategy) {
+ throw new UnsupportedOperationException("Cannot set chaining strategy on SideOutput Transformation.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 51c6cd7..d22c60d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -28,9 +29,9 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,15 +47,18 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
private SerializationDelegate<StreamElement> serializationDelegate;
private final StreamStatusProvider streamStatusProvider;
-
+
+ private final OutputTag outputTag;
+
@SuppressWarnings("unchecked")
public RecordWriterOutput(
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
TypeSerializer<OUT> outSerializer,
+ OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
checkNotNull(recordWriter);
-
+ this.outputTag = outputTag;
// generic hack: cast the writer to generic Object type so we can use it
// with multiplexed records and watermarks
this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)
@@ -72,6 +76,26 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
@Override
public void collect(StreamRecord<OUT> record) {
+ if (this.outputTag != null) {
+ // we are only responsible for emitting to the main input
+ return;
+ }
+
+ pushToRecordWriter(record);
+ }
+
+ @Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
+ // we are only responsible for emitting to the side-output specified by our
+ // OutputTag.
+ return;
+ }
+
+ pushToRecordWriter(record);
+ }
+
+ private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index dd93592..e21393a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
@@ -350,9 +351,22 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
private <T> RecordWriterOutput<T> createStreamOutput(
StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
Environment taskEnvironment,
- String taskName)
- {
- TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+ String taskName) {
+ OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
+
+ TypeSerializer outSerializer = null;
+
+ if (edge.getOutputTag() != null) {
+ // side output
+ outSerializer =
+ upStreamConfig.getTypeSerializerSideOut(
+ edge.getOutputTag(),
+ taskEnvironment.getUserClassLoader());
+ } else {
+ // main output
+ outSerializer =
+ upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+ }
@SuppressWarnings("unchecked")
StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
@@ -369,11 +383,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
- StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
+ StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
- return new RecordWriterOutput<>(output, outSerializer, this);
+ return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
}
// ------------------------------------------------------------------------
@@ -406,6 +420,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
@Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ // ignore
+ }
+
+ @Override
public void emitWatermark(Watermark mark) {
try {
if (streamStatusProvider.getStreamStatus().isActive()) {
@@ -457,8 +476,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
operator.setKeyContextElement1(copy);
operator.processElement(copy);
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new RuntimeException("Could not forward element to next operator", e);
}
}
@@ -471,7 +489,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
private final Random RNG = new XORShiftRandom();
private final StreamStatusProvider streamStatusProvider;
-
+
public BroadcastingOutputCollector(
Output<StreamRecord<T>>[] outputs,
StreamStatusProvider streamStatusProvider) {
@@ -508,6 +526,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
@Override
+ public <X> void collect(
+ OutputTag<?> outputTag, StreamRecord<X> record) {
+ for (Output<StreamRecord<T>> output : outputs) {
+ output.collect(outputTag, record);
+ }
+ }
+
+ @Override
public void close() {
for (Output<StreamRecord<T>> output : outputs) {
output.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index cdac11a..35d14e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,6 +123,12 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
}
@Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ throw new UnsupportedOperationException("Side outputs not used in iteration tail");
+
+ }
+
+ @Override
public void close() {
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index e22bf86..4e8bfd8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -270,6 +270,7 @@ public class OneInputStreamTaskTest extends TestLogger {
new StreamNode(null, 1, null, null, null, null, null),
0,
Collections.<String>emptyList(),
+ null,
null
)));
@@ -281,6 +282,7 @@ public class OneInputStreamTaskTest extends TestLogger {
new StreamNode(null, 2, null, null, null, null, null),
0,
Collections.<String>emptyList(),
+ null,
null
)));
@@ -290,7 +292,8 @@ public class OneInputStreamTaskTest extends TestLogger {
new StreamNode(null, 3, null, null, null, null, null),
0,
Collections.<String>emptyList(),
- new BroadcastPartitioner<Object>()));
+ new BroadcastPartitioner<Object>(),
+ null));
tailOperatorConfig.setStreamOperator(tailOperator);
tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
@@ -641,6 +644,7 @@ public class OneInputStreamTaskTest extends TestLogger {
),
0,
Collections.<String>emptyList(),
+ null,
null
);
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index e58bc5a..c51af4e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -153,7 +153,8 @@ public class StreamTaskTestHarness<OUT> {
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
- outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
+
+ outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index f5b7566..2167652 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -128,7 +128,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
targetVertexDummy,
1,
new LinkedList<String>(),
- new BroadcastPartitioner<Object>());
+ new BroadcastPartitioner<Object>(),
+ null /* output tag */);
inPhysicalEdges.add(streamEdge);
break;
@@ -143,7 +144,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
targetVertexDummy,
2,
new LinkedList<String>(),
- new BroadcastPartitioner<Object>());
+ new BroadcastPartitioner<Object>(),
+ null /* output tag */);
inPhysicalEdges.add(streamEdge);
break;
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index f0a4c42..74012b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
@@ -70,8 +71,10 @@ import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.mockito.Matchers.any;
@@ -88,6 +91,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
final protected ConcurrentLinkedQueue<Object> outputList;
+ final protected Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
+
final protected StreamConfig config;
final protected ExecutionConfig executionConfig;
@@ -147,6 +152,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
final Environment environment) throws Exception {
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<>();
+ this.sideOutputLists = new HashMap<>();
+
Configuration underlyingConfig = environment.getTaskConfiguration();
this.config = new StreamConfig(underlyingConfig);
this.config.setCheckpointingEnabled(true);
@@ -263,6 +270,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
return outputList;
}
+ public ConcurrentLinkedQueue<Object> getSideOutput(OutputTag tag) {
+ return sideOutputLists.get(tag);
+ }
+
/**
* Get only the {@link StreamRecord StreamRecords} emitted by the operator.
*/
@@ -610,6 +621,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
private TypeSerializer<OUT> outputSerializer;
+ private TypeSerializer sideOutputSerializer;
+
MockOutput() {
this(null);
}
@@ -634,13 +647,30 @@ public class AbstractStreamOperatorTestHarness<OUT> {
outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
}
if (element.hasTimestamp()) {
- outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),element.getTimestamp()));
+ outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()), element.getTimestamp()));
} else {
outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue())));
}
}
@Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ sideOutputSerializer = TypeExtractor.getForObject(record.getValue()).createSerializer(executionConfig);
+
+ ConcurrentLinkedQueue<Object> sideOutputList = sideOutputLists.get(outputTag);
+ if (sideOutputList == null) {
+ sideOutputList = new ConcurrentLinkedQueue<>();
+ sideOutputLists.put(outputTag, sideOutputList);
+ }
+ if (record.hasTimestamp()) {
+ sideOutputList.add(new StreamRecord<>(sideOutputSerializer.copy(record.getValue()), record.getTimestamp()));
+ } else {
+ sideOutputList.add(new StreamRecord<>(sideOutputSerializer.copy(record.getValue())));
+ }
+
+ }
+
+ @Override
public void close() {
// ignore
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
index fcc8a6c..07b37c8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import java.io.Serializable;
import java.util.List;
@@ -53,5 +54,10 @@ public class CollectorOutput<T> implements Output<StreamRecord<T>> {
}
@Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
+ }
+
+ @Override
public void close() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index bf3a488..867080c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
public class MockOutput<T> implements Output<StreamRecord<T>> {
private Collection<T> outputs;
@@ -40,6 +41,11 @@ public class MockOutput<T> implements Output<StreamRecord<T>> {
}
@Override
+ public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+ throw new UnsupportedOperationException("Side output not supported for MockOutput");
+ }
+
+ @Override
public void emitWatermark(Watermark mark) {
throw new RuntimeException("THIS MUST BE IMPLEMENTED");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 084d389..c4a38c0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.streaming.util.serialization.SerializationSchema
-import org.apache.flink.util.Collector
+import org.apache.flink.util.{Collector, OutputTag}
import scala.collection.JavaConverters._
@@ -239,6 +239,12 @@ class DataStream[T](stream: JavaStream[T]) {
this
}
+ @PublicEvolving
+ def getSideOutput[X: OutputTag](tag: OutputTag[X]): DataStream[X] = javaStream match {
+ case stream : SingleOutputStreamOperator[X] =>
+ asScalaStream(stream.getSideOutput(tag: OutputTag[X]))
+ }
+
/**
* Sets an user provided hash for this operator. This will be used AS IS the create
* the JobVertexID.