You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:43 UTC

[04/11] flink git commit: [FLINK-4460] Add support for side outputs

[FLINK-4460] Add support for side outputs

This does not yet allow users to emit to side outputs in user functions.
Only operators (StreamOperator) can emit to side outputs. A side output
can be retrieved on a SingleOutputStreamOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e134d275
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e134d275
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e134d275

Branch: refs/heads/master
Commit: e134d27589ead89882d94969edeeb171ee4433b1
Parents: f31a55e
Author: Chen Qin <qi...@gmail.com>
Authored: Fri Oct 21 12:38:04 2016 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/typeinfo/TypeHint.java     |  9 +-
 .../java/org/apache/flink/util/OutputTag.java   | 98 ++++++++++++++++++++
 .../api/collector/selector/DirectedOutput.java  |  6 ++
 .../datastream/SingleOutputStreamOperator.java  | 14 +++
 .../flink/streaming/api/graph/StreamConfig.java | 18 +++-
 .../flink/streaming/api/graph/StreamEdge.java   | 17 +++-
 .../flink/streaming/api/graph/StreamGraph.java  | 46 +++++++--
 .../api/graph/StreamGraphGenerator.java         | 34 ++++++-
 .../api/graph/StreamingJobGraphGenerator.java   | 20 ++++
 .../api/operators/AbstractStreamOperator.java   |  7 ++
 .../flink/streaming/api/operators/Output.java   |  9 ++
 .../SideOutputTransformation.java               | 73 +++++++++++++++
 .../runtime/io/RecordWriterOutput.java          | 30 +++++-
 .../streaming/runtime/tasks/OperatorChain.java  | 42 +++++++--
 .../runtime/tasks/StreamIterationTail.java      |  7 ++
 .../runtime/tasks/OneInputStreamTaskTest.java   |  6 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |  3 +-
 .../tasks/TwoInputStreamTaskTestHarness.java    |  6 +-
 .../util/AbstractStreamOperatorTestHarness.java | 32 ++++++-
 .../flink/streaming/util/CollectorOutput.java   |  6 ++
 .../apache/flink/streaming/util/MockOutput.java |  6 ++
 .../flink/streaming/api/scala/DataStream.scala  |  8 +-
 22 files changed, 467 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
index 975d6e3..0f40710 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
@@ -46,7 +46,14 @@ public abstract class TypeHint<T> {
 	public TypeHint() {
 		this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0);
 	}
-	
+
+	/**
+	 * Creates a hint for the generic type in the class signature.
+	 */
+	public TypeHint(Class<?> baseClass, Object instance, int genericParameterPos) {
+		this.typeInfo = TypeExtractor.createTypeInfo(instance, baseClass, instance.getClass(), genericParameterPos);
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/OutputTag.java b/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
new file mode 100644
index 0000000..eda3ab8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side outputs
+ * of an operator.
+ *
+ * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
+ * }</pre>
+ *
+ * @param <T> the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag<T> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final String id;
+
+	private transient TypeInformation<T> typeInfo;
+
+	/**
+	 * Creates a new named {@code OutputTag} with the given id.
+	 *
+	 * @param id The id of the created {@code OutputTag}.
+     */
+	public OutputTag(String id) {
+		this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+
+		try {
+			TypeHint<T> typeHint = new TypeHint<T>(OutputTag.class, this, 0) {};
+			this.typeInfo = typeHint.getTypeInfo();
+		} catch (InvalidTypesException e) {
+			throw new InvalidTypesException("Could not determine TypeInformation for generic " +
+					"OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e);
+		}
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		typeInfo = null;
+	}
+
+	public String getId() {
+		return id;
+	}
+
+	public TypeInformation<T> getTypeInfo() {
+		return typeInfo;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj instanceof OutputTag
+			&& ((OutputTag) obj).id.equals(this.id);
+	}
+
+	@Override
+	public int hashCode() {
+		return id.hashCode();
+	}
+
+	@Override
+	public String toString() {
+		return "OutputTag(" + getTypeInfo() + ", " + id + ")";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
index 24f1c63..dabe804 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
 
 
@@ -139,6 +140,11 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
 	}
 
 	@Override
+	public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+		throw new UnsupportedOperationException("Cannot use split/select with side outputs.");
+	}
+
+	@Override
 	public void close() {
 		for (Output<StreamRecord<OUT>> out : allOutputs) {
 			out.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index f1d5e3a..e61b39a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -27,6 +28,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.Preconditions;
@@ -416,4 +418,16 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 		transformation.setSlotSharingGroup(slotSharingGroup);
 		return this;
 	}
+
+	/**
+	 * Gets the {@link DataStream} that contains the elements that are emitted from an operation
+	 * into the side output with the given {@link OutputTag}.
+	 *
+	 * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
+	 */
+	public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag){
+		sideOutputTag = clean(sideOutputTag);
+		SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), requireNonNull(sideOutputTag));
+		return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index dd4c55c..93a6387 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
 
 @Internal
 public class StreamConfig implements Serializable {
@@ -63,6 +65,7 @@ public class StreamConfig implements Serializable {
 	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
 	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
 	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
+	private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
 	private static final String ITERATON_WAIT = "iterationWait";
 	private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
@@ -139,6 +142,10 @@ public class StreamConfig implements Serializable {
 	public void setTypeSerializerOut(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
 	}
+
+	public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer);
+	}
 	
 	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
 		try {
@@ -155,7 +162,7 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
 	}
-	
+
 	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
 		try {
 			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
@@ -164,6 +171,15 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
+	public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) {
+		Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate serializer.", e);
+		}
+	}
+
 	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
 		try {
 			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index e2bcac1..8e1c361 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 /**
@@ -48,15 +49,25 @@ public class StreamEdge implements Serializable {
 	 * output selection).
 	 */
 	private final List<String> selectedNames;
+
+	/**
+	 * The side-output tag (if any) of this {@link StreamEdge}.
+	 */
+	private final OutputTag outputTag;
+
+	/**
+	 * The {@link StreamPartitioner} on this {@link StreamEdge}.
+	 */
 	private StreamPartitioner<?> outputPartitioner;
 
 	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
-			List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
+			List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
 		this.sourceVertex = sourceVertex;
 		this.targetVertex = targetVertex;
 		this.typeNumber = typeNumber;
 		this.selectedNames = selectedNames;
 		this.outputPartitioner = outputPartitioner;
+		this.outputTag = outputTag;
 
 		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
 				+ "_" + outputPartitioner;
@@ -86,6 +97,8 @@ public class StreamEdge implements Serializable {
 		return selectedNames;
 	}
 
+	public OutputTag getOutputTag() {return this.outputTag;}
+
 	public StreamPartitioner<?> getPartitioner() {
 		return outputPartitioner;
 	}
@@ -117,6 +130,6 @@ public class StreamEdge implements Serializable {
 	public String toString() {
 		return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
 				+ ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
-				+ ')';
+				+ ", outputTag=" + outputTag + ')';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index c8d5340..e792a5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -85,6 +86,7 @@ public class StreamGraph extends StreamingPlan {
 	private Set<Integer> sources;
 	private Set<Integer> sinks;
 	private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
+	private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
 	private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;
 
 	protected Map<Integer, String> vertexIDtoBrokerID;
@@ -110,6 +112,7 @@ public class StreamGraph extends StreamingPlan {
 	public void clear() {
 		streamNodes = new HashMap<>();
 		virtualSelectNodes = new HashMap<>();
+		virtualSideOutputNodes = new HashMap<>();
 		virtualPartitionNodes = new HashMap<>();
 		vertexIDtoBrokerID = new HashMap<>();
 		vertexIDtoLoopTimeout  = new HashMap<>();
@@ -294,6 +297,23 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	/**
+	 * Adds a new virtual node that is used to connect a downstream vertex to only the outputs with
+	 * the selected side-output {@link OutputTag}.
+	 *
+	 * @param originalId ID of the node that should be connected to.
+	 * @param virtualId ID of the virtual node.
+	 * @param outputTag The selected side-output {@code OutputTag}.
+	 */
+	public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, OutputTag outputTag) {
+
+		if (virtualSideOutputNodes.containsKey(virtualId)) {
+			throw new IllegalStateException("Already has virtual output node with id " + virtualId);
+		}
+
+		virtualSideOutputNodes.put(virtualId, new Tuple2<>(originalId, outputTag));
+	}
+
+	/**
 	 * Adds a new virtual node that is used to connect a downstream vertex to an input with a certain
 	 * partitioning.
 	 *
@@ -318,7 +338,10 @@ public class StreamGraph extends StreamingPlan {
 	 * Determines the slot sharing group of an operation across virtual nodes.
 	 */
 	public String getSlotSharingGroup(Integer id) {
-		if (virtualSelectNodes.containsKey(id)) {
+		if (virtualSideOutputNodes.containsKey(id)) {
+			Integer mappedId = virtualSideOutputNodes.get(id).f0;
+			return getSlotSharingGroup(mappedId);
+		} else if (virtualSelectNodes.containsKey(id)) {
 			Integer mappedId = virtualSelectNodes.get(id).f0;
 			return getSlotSharingGroup(mappedId);
 		} else if (virtualPartitionNodes.containsKey(id)) {
@@ -335,7 +358,7 @@ public class StreamGraph extends StreamingPlan {
 				downStreamVertexID,
 				typeNumber,
 				null,
-				new ArrayList<String>());
+				new ArrayList<String>(), null);
 
 	}
 
@@ -343,24 +366,31 @@ public class StreamGraph extends StreamingPlan {
 			Integer downStreamVertexID,
 			int typeNumber,
 			StreamPartitioner<?> partitioner,
-			List<String> outputNames) {
+			List<String> outputNames,
+			OutputTag outputTag) {
 
-
-		if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+		if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
+			int virtualId = upStreamVertexID;
+			upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
+			if (outputTag == null) {
+				outputTag = virtualSideOutputNodes.get(virtualId).f1;
+			}
+			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
+		} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
 			int virtualId = upStreamVertexID;
 			upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
 			if (outputNames.isEmpty()) {
 				// selections that happen downstream override earlier selections
 				outputNames = virtualSelectNodes.get(virtualId).f1;
 			}
-			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
+			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
 		} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
 			int virtualId = upStreamVertexID;
 			upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
 			if (partitioner == null) {
 				partitioner = virtualPartitionNodes.get(virtualId).f1;
 			}
-			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
+			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
 		} else {
 			StreamNode upstreamNode = getStreamNode(upStreamVertexID);
 			StreamNode downstreamNode = getStreamNode(downStreamVertexID);
@@ -382,7 +412,7 @@ public class StreamGraph extends StreamingPlan {
 				}
 			}
 
-			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
+			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
 
 			getStreamNode(edge.getSourceId()).addOutEdge(edge);
 			getStreamNode(edge.getTargetId()).addInEdge(edge);

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 2defbef..df10ae4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.SelectTransformation;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.api.transformations.SplitTransformation;
@@ -184,6 +185,8 @@ public class StreamGraphGenerator {
 			transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
 		} else if (transform instanceof PartitionTransformation<?>) {
 			transformedIds = transformPartition((PartitionTransformation<?>) transform);
+		} else if (transform instanceof SideOutputTransformation<?>) {
+			transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
 		} else {
 			throw new IllegalStateException("Unknown transformation: " + transform);
 		}
@@ -302,6 +305,35 @@ public class StreamGraphGenerator {
 	}
 
 	/**
+	 * Transforms a {@code SideOutputTransformation}.
+	 *
+	 * <p>
+	 * For this we create a virtual node in the {@code StreamGraph} that holds the side-output
+	 * {@link org.apache.flink.util.OutputTag}.
+	 *
+	 * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
+	 */
+	private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
+		StreamTransformation<?> input = sideOutput.getInput();
+		Collection<Integer> resultIds = transform(input);
+
+
+		// the recursive transform might have already transformed this
+		if (alreadyTransformed.containsKey(sideOutput)) {
+			return alreadyTransformed.get(sideOutput);
+		}
+
+		List<Integer> virtualResultIds = new ArrayList<>();
+
+		for (int inputId : resultIds) {
+			int virtualId = StreamTransformation.getNewNodeId();
+			streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag());
+			virtualResultIds.add(virtualId);
+		}
+		return virtualResultIds;
+	}
+
+	/**
 	 * Transforms a {@code FeedbackTransformation}.
 	 *
 	 * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 60f8faa..9cf0432 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -374,6 +374,25 @@ public class StreamingJobGraphGenerator {
 		config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
 		config.setTypeSerializerOut(vertex.getTypeSerializerOut());
 
+		// iterate edges, find sideOutput edges create and save serializers for each outputTag type
+		for (StreamEdge edge : chainableOutputs) {
+			if (edge.getOutputTag() != null) {
+				config.setTypeSerializerSideOut(
+					edge.getOutputTag(),
+					edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig())
+				);
+			}
+		}
+		for (StreamEdge edge : nonChainableOutputs) {
+			if (edge.getOutputTag() != null) {
+				config.setTypeSerializerSideOut(
+						edge.getOutputTag(),
+						edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig())
+				);
+			}
+		}
+
+
 		config.setStreamOperator(vertex.getOperator());
 		config.setOutputSelectors(vertex.getOutputSelectors());
 
@@ -469,6 +488,7 @@ public class StreamingJobGraphGenerator {
 					headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
 				&& (edge.getPartitioner() instanceof ForwardPartitioner)
 				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
+				&& edge.getOutputTag() == null // disable chaining for side outputs
 				&& streamGraph.isChainingEnabled();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index b6f86a8..e40a59b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.OutputTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -862,6 +863,12 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 
 		@Override
+		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+			numRecordsOut.inc();
+			output.collect(outputTag, record);
+		}
+
+		@Override
 		public void close() {
 			output.close();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index ec2409e..eb10d8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -20,7 +20,9 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
@@ -41,5 +43,12 @@ public interface Output<T> extends Collector<T> {
 	 */
 	void emitWatermark(Watermark mark);
 
+	/**
+	 * Emits a record the the side output identified by the given {@link OutputTag}.
+	 *
+	 * @param record The record to collect.
+	 */
+	<X> void collect(OutputTag<?> outputTag, StreamRecord<X> record);
+
 	void emitLatencyMarker(LatencyMarker latencyMarker);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
new file mode 100644
index 0000000..a90b40d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This transformation represents a selection of a side output of an upstream operation with a
+ * given {@link OutputTag}.
+ *
+ * <p>This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code SideOutputTransformation}
+ */
+public class SideOutputTransformation<T> extends StreamTransformation<T> {
+	private final StreamTransformation<?> input;
+
+	private final OutputTag<T> tag;
+
+	public SideOutputTransformation(StreamTransformation<?> input, final OutputTag<T> tag) {
+		super("SideOutput", tag.getTypeInfo(), requireNonNull(input).getParallelism());
+		this.input = input;
+		this.tag = requireNonNull(tag);
+	}
+
+	/**
+	 * Returns the input {@code StreamTransformation}.
+	 */
+	public StreamTransformation<?> getInput() {
+		return input;
+	}
+
+	public OutputTag<T> getOutputTag() {
+		return tag;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		result.addAll(input.getTransitivePredecessors());
+		return result;
+	}
+
+	@Override
+	public final void setChainingStrategy(ChainingStrategy strategy) {
+		throw new UnsupportedOperationException("Cannot set chaining strategy on SideOutput Transformation.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 51c6cd7..d22c60d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -28,9 +29,9 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,15 +47,18 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	private SerializationDelegate<StreamElement> serializationDelegate;
 
 	private final StreamStatusProvider streamStatusProvider;
-	
+
+	private final OutputTag outputTag;
+
 	@SuppressWarnings("unchecked")
 	public RecordWriterOutput(
 			StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
 			TypeSerializer<OUT> outSerializer,
+			OutputTag outputTag,
 			StreamStatusProvider streamStatusProvider) {
 
 		checkNotNull(recordWriter);
-		
+		this.outputTag = outputTag;
 		// generic hack: cast the writer to generic Object type so we can use it 
 		// with multiplexed records and watermarks
 		this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>) 
@@ -72,6 +76,26 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 
 	@Override
 	public void collect(StreamRecord<OUT> record) {
+		if (this.outputTag != null) {
+			// we are only responsible for emitting to the main input
+			return;
+		}
+
+		pushToRecordWriter(record);
+	}
+
+	@Override
+	public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+		if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
+			// we are only responsible for emitting to the side-output specified by our
+			// OutputTag.
+			return;
+		}
+
+		pushToRecordWriter(record);
+	}
+
+	private <X> void pushToRecordWriter(StreamRecord<X> record) {
 		serializationDelegate.setInstance(record);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index dd93592..e21393a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
@@ -350,9 +351,22 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	private <T> RecordWriterOutput<T> createStreamOutput(
 			StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
 			Environment taskEnvironment,
-			String taskName)
-	{
-		TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+			String taskName) {
+		OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
+
+		TypeSerializer outSerializer = null;
+
+		if (edge.getOutputTag() != null) {
+			// side output
+			outSerializer =
+					upStreamConfig.getTypeSerializerSideOut(
+							edge.getOutputTag(),
+							taskEnvironment.getUserClassLoader());
+		} else {
+			// main output
+			outSerializer =
+					upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+		}
 
 		@SuppressWarnings("unchecked")
 		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
@@ -369,11 +383,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			}
 		}
 
-		StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output = 
+		StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
 				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
 		output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
 		
-		return new RecordWriterOutput<>(output, outSerializer, this);
+		return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -406,6 +420,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 
 		@Override
+		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+			// ignore
+		}
+
+		@Override
 		public void emitWatermark(Watermark mark) {
 			try {
 				if (streamStatusProvider.getStreamStatus().isActive()) {
@@ -457,8 +476,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
 				operator.setKeyContextElement1(copy);
 				operator.processElement(copy);
-			}
-			catch (Exception e) {
+			} catch (Exception e) {
 				throw new RuntimeException("Could not forward element to next operator", e);
 			}
 		}
@@ -471,7 +489,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		private final Random RNG = new XORShiftRandom();
 
 		private final StreamStatusProvider streamStatusProvider;
-		
+
 		public BroadcastingOutputCollector(
 				Output<StreamRecord<T>>[] outputs,
 				StreamStatusProvider streamStatusProvider) {
@@ -508,6 +526,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 
 		@Override
+		public <X> void collect(
+				OutputTag<?> outputTag, StreamRecord<X> record) {
+			for (Output<StreamRecord<T>> output : outputs) {
+				output.collect(outputTag, record);
+			}
+		}
+
+		@Override
 		public void close() {
 			for (Output<StreamRecord<T>> output : outputs) {
 				output.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index cdac11a..35d14e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,6 +123,12 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		}
 
 		@Override
+		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+			throw new UnsupportedOperationException("Side outputs not used in iteration tail");
+
+		}
+
+		@Override
 		public void close() {
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index e22bf86..4e8bfd8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -270,6 +270,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 			new StreamNode(null, 1, null, null, null, null, null),
 			0,
 			Collections.<String>emptyList(),
+			null,
 			null
 		)));
 
@@ -281,6 +282,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 			new StreamNode(null, 2, null, null, null, null, null),
 			0,
 			Collections.<String>emptyList(),
+			null,
 			null
 		)));
 
@@ -290,7 +292,8 @@ public class OneInputStreamTaskTest extends TestLogger {
 			new StreamNode(null, 3, null, null, null, null, null),
 			0,
 			Collections.<String>emptyList(),
-			new BroadcastPartitioner<Object>()));
+			new BroadcastPartitioner<Object>(),
+			null));
 
 		tailOperatorConfig.setStreamOperator(tailOperator);
 		tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
@@ -641,6 +644,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 				),
 				0,
 				Collections.<String>emptyList(),
+				null,
 				null
 			);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index e58bc5a..c51af4e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -153,7 +153,8 @@ public class StreamTaskTestHarness<OUT> {
 		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
 		StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
 		StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
+
+		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
 
 		streamConfig.setOutEdgesInOrder(outEdgesInOrder);
 		streamConfig.setNonChainedOutputs(outEdgesInOrder);

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index f5b7566..2167652 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -128,7 +128,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
 							targetVertexDummy,
 							1,
 							new LinkedList<String>(),
-							new BroadcastPartitioner<Object>());
+							new BroadcastPartitioner<Object>(),
+							null /* output tag */);
 
 					inPhysicalEdges.add(streamEdge);
 					break;
@@ -143,7 +144,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
 							targetVertexDummy,
 							2,
 							new LinkedList<String>(),
-							new BroadcastPartitioner<Object>());
+							new BroadcastPartitioner<Object>(),
+							null /* output tag */);
 
 					inPhysicalEdges.add(streamEdge);
 					break;

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index f0a4c42..74012b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
@@ -70,8 +71,10 @@ import java.io.FileInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.mockito.Matchers.any;
@@ -88,6 +91,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 	final protected ConcurrentLinkedQueue<Object> outputList;
 
+	final protected Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
+
 	final protected StreamConfig config;
 
 	final protected ExecutionConfig executionConfig;
@@ -147,6 +152,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 			final Environment environment) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
+		this.sideOutputLists = new HashMap<>();
+
 		Configuration underlyingConfig = environment.getTaskConfiguration();
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
@@ -263,6 +270,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		return outputList;
 	}
 
+	public ConcurrentLinkedQueue<Object> getSideOutput(OutputTag tag) {
+		return sideOutputLists.get(tag);
+	}
+
 	/**
 	 * Get only the {@link StreamRecord StreamRecords} emitted by the operator.
 	 */
@@ -610,6 +621,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 		private TypeSerializer<OUT> outputSerializer;
 
+		private TypeSerializer sideOutputSerializer;
+
 		MockOutput() {
 			this(null);
 		}
@@ -634,13 +647,30 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 				outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
 			}
 			if (element.hasTimestamp()) {
-				outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),element.getTimestamp()));
+				outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()), element.getTimestamp()));
 			} else {
 				outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue())));
 			}
 		}
 
 		@Override
+		public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+			sideOutputSerializer = TypeExtractor.getForObject(record.getValue()).createSerializer(executionConfig);
+
+			ConcurrentLinkedQueue<Object> sideOutputList = sideOutputLists.get(outputTag);
+			if (sideOutputList == null) {
+				sideOutputList = new ConcurrentLinkedQueue<>();
+				sideOutputLists.put(outputTag, sideOutputList);
+			}
+			if (record.hasTimestamp()) {
+				sideOutputList.add(new StreamRecord<>(sideOutputSerializer.copy(record.getValue()), record.getTimestamp()));
+			} else {
+				sideOutputList.add(new StreamRecord<>(sideOutputSerializer.copy(record.getValue())));
+			}
+
+		}
+
+		@Override
 		public void close() {
 			// ignore
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
index fcc8a6c..07b37c8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 
 import java.io.Serializable;
 import java.util.List;
@@ -53,5 +54,10 @@ public class CollectorOutput<T> implements Output<StreamRecord<T>> {
 	}
 
 	@Override
+	public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+		throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
+	}
+
+	@Override
 	public void close() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index bf3a488..867080c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 
 public class MockOutput<T> implements Output<StreamRecord<T>> {
 	private Collection<T> outputs;
@@ -40,6 +41,11 @@ public class MockOutput<T> implements Output<StreamRecord<T>> {
 	}
 
 	@Override
+	public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
+		throw new UnsupportedOperationException("Side output not supported for MockOutput");
+	}
+
+	@Override
 	public void emitWatermark(Watermark mark) {
 		throw new RuntimeException("THIS MUST BE IMPLEMENTED");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e134d275/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 084d389..c4a38c0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
-import org.apache.flink.util.Collector
+import org.apache.flink.util.{Collector, OutputTag}
 
 import scala.collection.JavaConverters._
 
@@ -239,6 +239,12 @@ class DataStream[T](stream: JavaStream[T]) {
     this
   }
 
+  @PublicEvolving
+  def getSideOutput[X: OutputTag](tag: OutputTag[X]): DataStream[X] = javaStream match {
+    case stream : SingleOutputStreamOperator[X] =>
+      asScalaStream(stream.getSideOutput(tag: OutputTag[X]))
+  }
+
   /**
     * Sets an user provided hash for this operator. This will be used AS IS the create
     * the JobVertexID.