You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/06 13:28:45 UTC

[1/3] beam git commit: Remove the FnOutputT parameter from DoFnOperator

Repository: beam
Updated Branches:
  refs/heads/master 88f78fa2f -> aebd3a4c5


Remove the FnOutputT parameter from DoFnOperator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8f26085
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8f26085
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8f26085

Branch: refs/heads/master
Commit: e8f26085e889f8f618c0961a5458cbc42b432c01
Parents: b0601fd
Author: JingsongLi <lz...@aliyun.com>
Authored: Tue Jun 6 17:31:09 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 6 14:33:36 2017 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 10 +++++-----
 .../wrappers/streaming/DoFnOperator.java        | 20 ++++++++++----------
 .../streaming/SplittableDoFnOperator.java       | 12 ++++++------
 .../wrappers/streaming/WindowDoFnOperator.java  |  2 +-
 .../beam/runners/flink/PipelineOptionsTest.java |  6 +++---
 .../flink/streaming/DoFnOperatorTest.java       | 11 +++++------
 6 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index d8c3049..2a7c5d6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -332,7 +332,7 @@ class FlinkStreamingTransformTranslators {
   static class ParDoTranslationHelper {
 
     interface DoFnOperatorFactory<InputT, OutputT> {
-      DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
+      DoFnOperator<InputT, OutputT> createDoFnOperator(
           DoFn<InputT, OutputT> doFn,
           String stepName,
           List<PCollectionView<?>> sideInputs,
@@ -395,7 +395,7 @@ class FlinkStreamingTransformTranslators {
               context.getCoder((PCollection<OutputT>) outputs.get(mainOutputTag)));
 
       if (sideInputs.isEmpty()) {
-        DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
+        DoFnOperator<InputT, OutputT> doFnOperator =
             doFnOperatorFactory.createDoFnOperator(
                 doFn,
                 context.getCurrentTransform().getFullName(),
@@ -416,7 +416,7 @@ class FlinkStreamingTransformTranslators {
         Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
             transformSideInputs(sideInputs, context);
 
-        DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
+        DoFnOperator<InputT, OutputT> doFnOperator =
             doFnOperatorFactory.createDoFnOperator(
                 doFn,
                 context.getCurrentTransform().getFullName(),
@@ -493,7 +493,7 @@ class FlinkStreamingTransformTranslators {
           context,
           new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
             @Override
-            public DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
+            public DoFnOperator<InputT, OutputT> createDoFnOperator(
                 DoFn<InputT, OutputT> doFn,
                 String stepName,
                 List<PCollectionView<?>> sideInputs,
@@ -547,7 +547,7 @@ class FlinkStreamingTransformTranslators {
             @Override
             public DoFnOperator<
                 KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
-                OutputT, OutputT> createDoFnOperator(
+                OutputT> createDoFnOperator(
                     DoFn<
                         KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
                         OutputT> doFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8c27ed9..350f323 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -94,21 +94,21 @@ import org.joda.time.Instant;
  * Flink operator for executing {@link DoFn DoFns}.
  *
  * @param <InputT> the input type of the {@link DoFn}
- * @param <FnOutputT> the output type of the {@link DoFn}
+ * @param <OutputT> the output type of the {@link DoFn}
  * @param <OutputT> the output type of the operator, this can be different from the fn output
  *                 type when we have additional tagged outputs
  */
-public class DoFnOperator<InputT, FnOutputT, OutputT>
+public class DoFnOperator<InputT, OutputT>
     extends AbstractStreamOperator<WindowedValue<OutputT>>
     implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>,
       TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>,
     KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
 
-  protected DoFn<InputT, FnOutputT> doFn;
+  protected DoFn<InputT, OutputT> doFn;
 
   protected final SerializedPipelineOptions serializedOptions;
 
-  protected final TupleTag<FnOutputT> mainOutputTag;
+  protected final TupleTag<OutputT> mainOutputTag;
   protected final List<TupleTag<?>> additionalOutputTags;
 
   protected final Collection<PCollectionView<?>> sideInputs;
@@ -118,8 +118,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected final OutputManagerFactory<OutputT> outputManagerFactory;
 
-  protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
-  protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
+  protected transient DoFnRunner<InputT, OutputT> doFnRunner;
+  protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
 
   protected transient SideInputHandler sideInputHandler;
 
@@ -127,7 +127,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient DoFnRunners.OutputManager outputManager;
 
-  private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
 
   protected transient long currentInputWatermark;
 
@@ -156,10 +156,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   private transient Optional<Long> pushedBackWatermark;
 
   public DoFnOperator(
-      DoFn<InputT, FnOutputT> doFn,
+      DoFn<InputT, OutputT> doFn,
       String stepName,
       Coder<WindowedValue<InputT>> inputCoder,
-      TupleTag<FnOutputT> mainOutputTag,
+      TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       OutputManagerFactory<OutputT> outputManagerFactory,
       WindowingStrategy<?, ?> windowingStrategy,
@@ -192,7 +192,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   // allow overriding this in WindowDoFnOperator because this one dynamically creates
   // the DoFn
-  protected DoFn<InputT, FnOutputT> getDoFn() {
+  protected DoFn<InputT, OutputT> getDoFn() {
     return doFn;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 4ac2ff5..5d08eba 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -55,19 +55,19 @@ import org.joda.time.Instant;
  * the {@code @ProcessElement} method of a splittable {@link DoFn}.
  */
 public class SplittableDoFnOperator<
-    InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+    InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
     extends DoFnOperator<
-    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> {
+    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
 
   private transient ScheduledExecutorService executorService;
 
   public SplittableDoFnOperator(
-      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
+      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn,
       String stepName,
       Coder<
           WindowedValue<
               KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
-      TupleTag<FnOutputT> mainOutputTag,
+      TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       OutputManagerFactory<OutputT> outputManagerFactory,
       WindowingStrategy<?, ?> windowingStrategy,
@@ -120,10 +120,10 @@ public class SplittableDoFnOperator<
         new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
             doFn,
             serializedOptions.getPipelineOptions(),
-            new OutputWindowedValue<FnOutputT>() {
+            new OutputWindowedValue<OutputT>() {
               @Override
               public void outputWindowedValue(
-                  FnOutputT output,
+                  OutputT output,
                   Instant timestamp,
                   Collection<? extends BoundedWindow> windows,
                   PaneInfo pane) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index ea578b9..78d585e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer;
  * Flink operator for executing window {@link DoFn DoFns}.
  */
 public class WindowDoFnOperator<K, InputT, OutputT>
-    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, KV<K, OutputT>> {
+    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
 
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index bc0b1c2..d0281ec 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -136,7 +136,7 @@ public class PipelineOptionsTest {
 
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new TestDoFn(),
         "stepName",
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -157,7 +157,7 @@ public class PipelineOptionsTest {
   @Test
   public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
 
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new TestDoFn(),
         "stepName",
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -173,7 +173,7 @@ public class PipelineOptionsTest {
     final byte[] serialized = SerializationUtils.serialize(doFnOperator);
 
     @SuppressWarnings("unchecked")
-    DoFnOperator<Object, Object, Object> deserialized = SerializationUtils.deserialize(serialized);
+    DoFnOperator<Object, Object> deserialized = SerializationUtils.deserialize(serialized);
 
     TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of(
         new TypeHint<WindowedValue<Object>>() {});

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 132242e..ad9d236 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -111,7 +111,7 @@ public class DoFnOperatorTest {
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new IdentityDoFn<String>(),
         "stepName",
         windowedValueCoder,
@@ -155,7 +155,7 @@ public class DoFnOperatorTest {
             .put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){})
             .build();
 
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new MultiOutputDoFn(additionalOutput1, additionalOutput2),
         "stepName",
         windowedValueCoder,
@@ -223,7 +223,7 @@ public class DoFnOperatorTest {
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
-    DoFnOperator<Integer, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>(
         fn,
         "stepName",
         windowedValueCoder,
@@ -335,8 +335,7 @@ public class DoFnOperatorTest {
 
     TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
 
-    DoFnOperator<
-        KV<String, Integer>, KV<String, Integer>, KV<String, Integer>> doFnOperator =
+    DoFnOperator<KV<String, Integer>, KV<String, Integer>> doFnOperator =
         new DoFnOperator<>(
             fn,
             "stepName",
@@ -433,7 +432,7 @@ public class DoFnOperatorTest {
       keyCoder = StringUtf8Coder.of();
     }
 
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new IdentityDoFn<String>(),
         "stepName",
         windowedValueCoder,


[3/3] beam git commit: This closes #3290

Posted by al...@apache.org.
This closes #3290


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aebd3a4c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aebd3a4c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aebd3a4c

Branch: refs/heads/master
Commit: aebd3a4c5d416ed6666d9f829e9ac1aecb230ac6
Parents: 88f78fa e8f2608
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jun 6 14:33:46 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 6 14:33:46 2017 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 145 ++++++-------------
 .../wrappers/streaming/DoFnOperator.java        |  60 +++++---
 .../streaming/SplittableDoFnOperator.java       |  12 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   4 +-
 .../beam/runners/flink/PipelineOptionsTest.java |   9 +-
 .../flink/streaming/DoFnOperatorTest.java       |  70 +++++----
 6 files changed, 132 insertions(+), 168 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: [BEAM-1498] Use Flink-native side outputs

Posted by al...@apache.org.
[BEAM-1498] Use Flink-native side outputs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0601fd4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0601fd4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0601fd4

Branch: refs/heads/master
Commit: b0601fd43e0929e8b925dbe566e564460f91d9fc
Parents: 88f78fa
Author: JingsongLi <lz...@aliyun.com>
Authored: Sun Jun 4 21:56:10 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 6 14:33:36 2017 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 145 ++++++-------------
 .../wrappers/streaming/DoFnOperator.java        |  40 +++--
 .../wrappers/streaming/WindowDoFnOperator.java  |   4 +-
 .../beam/runners/flink/PipelineOptionsTest.java |   5 +-
 .../flink/streaming/DoFnOperatorTest.java       |  65 +++++----
 5 files changed, 112 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 00e9934..d8c3049 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -18,9 +18,6 @@
 
 package org.apache.beam.runners.flink;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,7 +26,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -84,16 +80,15 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * This class contains all the mappings between Beam and Flink
@@ -337,7 +332,7 @@ class FlinkStreamingTransformTranslators {
   static class ParDoTranslationHelper {
 
     interface DoFnOperatorFactory<InputT, OutputT> {
-      DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+      DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
           DoFn<InputT, OutputT> doFn,
           String stepName,
           List<PCollectionView<?>> sideInputs,
@@ -345,7 +340,7 @@ class FlinkStreamingTransformTranslators {
           List<TupleTag<?>> additionalOutputTags,
           FlinkStreamingTranslationContext context,
           WindowingStrategy<?, ?> windowingStrategy,
-          Map<TupleTag<?>, Integer> tagsToLabels,
+          Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToLabels,
           Coder<WindowedValue<InputT>> inputCoder,
           Coder keyCoder,
           Map<Integer, PCollectionView<?>> transformedSideInputs);
@@ -354,7 +349,6 @@ class FlinkStreamingTransformTranslators {
     static <InputT, OutputT> void translateParDo(
         String transformName,
         DoFn<InputT, OutputT> doFn,
-        String stepName,
         PCollection<InputT> input,
         List<PCollectionView<?>> sideInputs,
         Map<TupleTag<?>, PValue> outputs,
@@ -366,10 +360,15 @@ class FlinkStreamingTransformTranslators {
       // we assume that the transformation does not change the windowing strategy.
       WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
 
-      Map<TupleTag<?>, Integer> tagsToLabels =
-          transformTupleTagsToLabels(mainOutputTag, outputs);
+      Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap();
+      for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
+        if (!tagsToOutputTags.containsKey(entry.getKey())) {
+          tagsToOutputTags.put(entry.getKey(), new OutputTag<>(entry.getKey().getId(),
+              (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())));
+        }
+      }
 
-      SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
+      SingleOutputStreamOperator<WindowedValue<OutputT>> outputStream;
 
       Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input);
 
@@ -391,8 +390,12 @@ class FlinkStreamingTransformTranslators {
         stateful = true;
       }
 
+      CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation =
+          new CoderTypeInformation<>(
+              context.getCoder((PCollection<OutputT>) outputs.get(mainOutputTag)));
+
       if (sideInputs.isEmpty()) {
-        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+        DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
             doFnOperatorFactory.createDoFnOperator(
                 doFn,
                 context.getCurrentTransform().getFullName(),
@@ -401,24 +404,19 @@ class FlinkStreamingTransformTranslators {
                 additionalOutputTags,
                 context,
                 windowingStrategy,
-                tagsToLabels,
+                tagsToOutputTags,
                 inputCoder,
                 keyCoder,
                 new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */);
 
-        UnionCoder outputUnionCoder = createUnionCoder(outputs);
-
-        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
-            new CoderTypeInformation<>(outputUnionCoder);
-
-        unionOutputStream = inputDataStream
-            .transform(transformName, outputUnionTypeInformation, doFnOperator);
+        outputStream = inputDataStream
+            .transform(transformName, outputTypeInformation, doFnOperator);
 
       } else {
         Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
             transformSideInputs(sideInputs, context);
 
-        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+        DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
             doFnOperatorFactory.createDoFnOperator(
                 doFn,
                 context.getCurrentTransform().getFullName(),
@@ -427,16 +425,11 @@ class FlinkStreamingTransformTranslators {
                 additionalOutputTags,
                 context,
                 windowingStrategy,
-                tagsToLabels,
+                tagsToOutputTags,
                 inputCoder,
                 keyCoder,
                 transformedSideInputs.f0);
 
-        UnionCoder outputUnionCoder = createUnionCoder(outputs);
-
-        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
-            new CoderTypeInformation<>(outputUnionCoder);
-
         if (stateful) {
           // we have to manually contruct the two-input transform because we're not
           // allowed to have only one input keyed, normally.
@@ -448,83 +441,35 @@ class FlinkStreamingTransformTranslators {
               keyedStream.getTransformation(),
               transformedSideInputs.f1.broadcast().getTransformation(),
               transformName,
-              (TwoInputStreamOperator) doFnOperator,
-              outputUnionTypeInformation,
+              doFnOperator,
+              outputTypeInformation,
               keyedStream.getParallelism());
 
           rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
           rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
 
-          unionOutputStream = new SingleOutputStreamOperator(
-                  keyedStream.getExecutionEnvironment(),
-                  rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+          outputStream = new SingleOutputStreamOperator(
+              keyedStream.getExecutionEnvironment(),
+              rawFlinkTransform) {
+          }; // we have to cheat around the ctor being protected
 
           keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
 
         } else {
-          unionOutputStream = inputDataStream
+          outputStream = inputDataStream
               .connect(transformedSideInputs.f1.broadcast())
-              .transform(transformName, outputUnionTypeInformation, doFnOperator);
+              .transform(transformName, outputTypeInformation, doFnOperator);
         }
       }
 
-      SplitStream<RawUnionValue> splitStream = unionOutputStream
-              .split(new OutputSelector<RawUnionValue>() {
-                @Override
-                public Iterable<String> select(RawUnionValue value) {
-                  return Collections.singletonList(Integer.toString(value.getUnionTag()));
-                }
-              });
-
-      for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
-        final int outputTag = tagsToLabels.get(output.getKey());
-
-        TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue());
-
-        @SuppressWarnings("unchecked")
-        DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
-          .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
-            @Override
-            public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
-              out.collect(value.getValue());
-            }
-          }).returns(outputTypeInfo);
-
-        context.setOutputDataStream(output.getValue(), unwrapped);
-      }
-    }
-
-    private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
-        TupleTag<?> mainTag,
-        Map<TupleTag<?>, PValue> allTaggedValues) {
+      context.setOutputDataStream(outputs.get(mainOutputTag), outputStream);
 
-      Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
-      int count = 0;
-      tagToLabelMap.put(mainTag, count++);
-      for (TupleTag<?> key : allTaggedValues.keySet()) {
-        if (!tagToLabelMap.containsKey(key)) {
-          tagToLabelMap.put(key, count++);
+      for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
+        if (!entry.getKey().equals(mainOutputTag)) {
+          context.setOutputDataStream(entry.getValue(),
+              outputStream.getSideOutput(tagsToOutputTags.get(entry.getKey())));
         }
       }
-      return tagToLabelMap;
-    }
-
-    private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> taggedCollections) {
-      List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PValue taggedColl : taggedCollections.values()) {
-        checkArgument(
-            taggedColl instanceof PCollection,
-            "A Union Coder can only be created for a Collection of Tagged %s. Got %s",
-            PCollection.class.getSimpleName(),
-            taggedColl.getClass().getSimpleName());
-        PCollection<?> coll = (PCollection<?>) taggedColl;
-        WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
-            WindowedValue.getFullCoder(
-                coll.getCoder(),
-                coll.getWindowingStrategy().getWindowFn().windowCoder());
-        outputCoders.add(windowedValueCoder);
-      }
-      return UnionCoder.of(outputCoders);
     }
   }
 
@@ -540,7 +485,6 @@ class FlinkStreamingTransformTranslators {
       ParDoTranslationHelper.translateParDo(
           transform.getName(),
           transform.getFn(),
-          context.getCurrentTransform().getFullName(),
           (PCollection<InputT>) context.getInput(transform),
           transform.getSideInputs(),
           context.getOutputs(transform),
@@ -549,7 +493,7 @@ class FlinkStreamingTransformTranslators {
           context,
           new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
             @Override
-            public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+            public DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
                 DoFn<InputT, OutputT> doFn,
                 String stepName,
                 List<PCollectionView<?>> sideInputs,
@@ -557,7 +501,7 @@ class FlinkStreamingTransformTranslators {
                 List<TupleTag<?>> additionalOutputTags,
                 FlinkStreamingTranslationContext context,
                 WindowingStrategy<?, ?> windowingStrategy,
-                Map<TupleTag<?>, Integer> tagsToLabels,
+                Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
                 Coder<WindowedValue<InputT>> inputCoder,
                 Coder keyCoder,
                 Map<Integer, PCollectionView<?>> transformedSideInputs) {
@@ -567,7 +511,7 @@ class FlinkStreamingTransformTranslators {
                   inputCoder,
                   mainOutputTag,
                   additionalOutputTags,
-                  new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                  new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags),
                   windowingStrategy,
                   transformedSideInputs,
                   sideInputs,
@@ -592,7 +536,6 @@ class FlinkStreamingTransformTranslators {
       ParDoTranslationHelper.translateParDo(
           transform.getName(),
           transform.newProcessFn(transform.getFn()),
-          context.getCurrentTransform().getFullName(),
           context.getInput(transform),
           transform.getSideInputs(),
           context.getOutputs(transform),
@@ -604,8 +547,7 @@ class FlinkStreamingTransformTranslators {
             @Override
             public DoFnOperator<
                 KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
-                OutputT,
-                RawUnionValue> createDoFnOperator(
+                OutputT, OutputT> createDoFnOperator(
                     DoFn<
                         KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
                         OutputT> doFn,
@@ -615,7 +557,7 @@ class FlinkStreamingTransformTranslators {
                     List<TupleTag<?>> additionalOutputTags,
                     FlinkStreamingTranslationContext context,
                     WindowingStrategy<?, ?> windowingStrategy,
-                    Map<TupleTag<?>, Integer> tagsToLabels,
+                    Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
                     Coder<
                         WindowedValue<
                             KeyedWorkItem<
@@ -629,7 +571,7 @@ class FlinkStreamingTransformTranslators {
                   inputCoder,
                   mainOutputTag,
                   additionalOutputTags,
-                  new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                  new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags),
                   windowingStrategy,
                   transformedSideInputs,
                   sideInputs,
@@ -756,8 +698,7 @@ class FlinkStreamingTransformTranslators {
       TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      DoFnOperator.DefaultOutputManagerFactory<
-            WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
+      DoFnOperator.DefaultOutputManagerFactory<KV<K, Iterable<InputT>>> outputManagerFactory =
           new DoFnOperator.DefaultOutputManagerFactory<>();
 
       WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
@@ -868,7 +809,7 @@ class FlinkStreamingTransformTranslators {
                 (Coder) windowedWorkItemCoder,
                 new TupleTag<KV<K, OutputT>>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
-                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+                new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(),
                 windowingStrategy,
                 new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
                 Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -894,7 +835,7 @@ class FlinkStreamingTransformTranslators {
                 (Coder) windowedWorkItemCoder,
                 new TupleTag<KV<K, OutputT>>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
-                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+                new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(),
                 windowingStrategy,
                 transformSideInputs.f0,
                 sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 594fe0e..8c27ed9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -87,6 +87,7 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 import org.joda.time.Instant;
 
 /**
@@ -98,9 +99,9 @@ import org.joda.time.Instant;
  *                 type when we have additional tagged outputs
  */
 public class DoFnOperator<InputT, FnOutputT, OutputT>
-    extends AbstractStreamOperator<OutputT>
-    implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
-      TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>,
+    extends AbstractStreamOperator<WindowedValue<OutputT>>
+    implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>,
+      TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>,
     KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
 
   protected DoFn<InputT, FnOutputT> doFn;
@@ -662,7 +663,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
    * a Flink {@link Output}.
    */
   interface OutputManagerFactory<OutputT> extends Serializable {
-    DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
+    DoFnRunners.OutputManager create(Output<StreamRecord<WindowedValue<OutputT>>> output);
   }
 
   /**
@@ -673,14 +674,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   public static class DefaultOutputManagerFactory<OutputT>
       implements OutputManagerFactory<OutputT> {
     @Override
-    public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
+    public DoFnRunners.OutputManager create(
+        final Output<StreamRecord<WindowedValue<OutputT>>> output) {
       return new DoFnRunners.OutputManager() {
         @Override
         public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
           // with tagged outputs we can't get around this because we don't
           // know our own output type...
           @SuppressWarnings("unchecked")
-          OutputT castValue = (OutputT) value;
+          WindowedValue<OutputT> castValue = (WindowedValue<OutputT>) value;
           output.collect(new StreamRecord<>(castValue));
         }
       };
@@ -692,22 +694,34 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
    * {@link DoFnRunners.OutputManager} that can write to multiple logical
    * outputs by unioning them in a {@link RawUnionValue}.
    */
-  public static class MultiOutputOutputManagerFactory
-      implements OutputManagerFactory<RawUnionValue> {
+  public static class MultiOutputOutputManagerFactory<OutputT>
+      implements OutputManagerFactory<OutputT> {
 
-    Map<TupleTag<?>, Integer> mapping;
+    private TupleTag<?> mainTag;
+    Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping;
 
-    public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> mapping) {
+    public MultiOutputOutputManagerFactory(
+        TupleTag<?> mainTag,
+        Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping) {
+      this.mainTag = mainTag;
       this.mapping = mapping;
     }
 
     @Override
-    public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
+    public DoFnRunners.OutputManager create(
+        final Output<StreamRecord<WindowedValue<OutputT>>> output) {
       return new DoFnRunners.OutputManager() {
         @Override
         public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
-          int intTag = mapping.get(tag);
-          output.collect(new StreamRecord<>(new RawUnionValue(intTag, value)));
+          if (tag.equals(mainTag)) {
+            @SuppressWarnings("unchecked")
+            WindowedValue<OutputT> outputValue = (WindowedValue<OutputT>) value;
+            output.collect(new StreamRecord<>(outputValue));
+          } else {
+            @SuppressWarnings("unchecked")
+            OutputTag<WindowedValue<T>> outputTag = (OutputTag) mapping.get(tag);
+            output.<WindowedValue<T>>collect(outputTag, new StreamRecord<>(value));
+          }
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index bf64ede..ea578b9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer;
  * Flink operator for executing window {@link DoFn DoFns}.
  */
 public class WindowDoFnOperator<K, InputT, OutputT>
-    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
+    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, KV<K, OutputT>> {
 
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
 
@@ -56,7 +56,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
       TupleTag<KV<K, OutputT>> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
-      OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
+      OutputManagerFactory<KV<K, OutputT>> outputManagerFactory,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 8382a2d..bc0b1c2 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -173,13 +173,12 @@ public class PipelineOptionsTest {
     final byte[] serialized = SerializationUtils.serialize(doFnOperator);
 
     @SuppressWarnings("unchecked")
-    DoFnOperator<Object, Object, Object> deserialized =
-        (DoFnOperator<Object, Object, Object>) SerializationUtils.deserialize(serialized);
+    DoFnOperator<Object, Object, Object> deserialized = SerializationUtils.deserialize(serialized);
 
     TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of(
         new TypeHint<WindowedValue<Object>>() {});
 
-    OneInputStreamOperatorTestHarness<WindowedValue<Object>, Object> testHarness =
+    OneInputStreamOperatorTestHarness<WindowedValue<Object>, WindowedValue<Object>> testHarness =
         new OneInputStreamOperatorTestHarness<>(deserialized,
             typeInformation.createSerializer(new ExecutionConfig()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 79bc0e0..132242e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -65,6 +65,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -123,7 +124,7 @@ public class DoFnOperatorTest {
         PipelineOptionsFactory.as(FlinkPipelineOptions.class),
         null);
 
-    OneInputStreamOperatorTestHarness<WindowedValue<String>, String> testHarness =
+    OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
         new OneInputStreamOperatorTestHarness<>(doFnOperator);
 
     testHarness.open();
@@ -147,26 +148,27 @@ public class DoFnOperatorTest {
     TupleTag<String> mainOutput = new TupleTag<>("main-output");
     TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
     TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
-    ImmutableMap<TupleTag<?>, Integer> outputMapping = ImmutableMap.<TupleTag<?>, Integer>builder()
-        .put(mainOutput, 1)
-        .put(additionalOutput1, 2)
-        .put(additionalOutput2, 3)
-        .build();
+    ImmutableMap<TupleTag<?>, OutputTag<?>> outputMapping =
+        ImmutableMap.<TupleTag<?>, OutputTag<?>>builder()
+            .put(mainOutput, new OutputTag<String>(mainOutput.getId()){})
+            .put(additionalOutput1, new OutputTag<String>(additionalOutput1.getId()){})
+            .put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){})
+            .build();
 
-    DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
         new MultiOutputDoFn(additionalOutput1, additionalOutput2),
         "stepName",
         windowedValueCoder,
         mainOutput,
         ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2),
-        new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping),
+        new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, outputMapping),
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
         Collections.<PCollectionView<?>>emptyList(), /* side inputs */
         PipelineOptionsFactory.as(FlinkPipelineOptions.class),
         null);
 
-    OneInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue> testHarness =
+    OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
         new OneInputStreamOperatorTestHarness<>(doFnOperator);
 
     testHarness.open();
@@ -176,17 +178,26 @@ public class DoFnOperatorTest {
     testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("hello")));
 
     assertThat(
-        this.stripStreamRecordFromRawUnion(testHarness.getOutput()),
+        this.stripStreamRecord(testHarness.getOutput()),
+        contains(
+            WindowedValue.valueInGlobalWindow("got: hello")));
+
+    assertThat(
+        this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput1))),
         contains(
-            new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")),
-            new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")),
-            new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")),
-            new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")),
-            new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello"))));
+            WindowedValue.valueInGlobalWindow("extra: one"),
+            WindowedValue.valueInGlobalWindow("got: hello")));
+
+    assertThat(
+        this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput2))),
+        contains(
+            WindowedValue.valueInGlobalWindow("extra: two"),
+            WindowedValue.valueInGlobalWindow("got: hello")));
 
     testHarness.close();
   }
 
+
   @Test
   public void testLateDroppingForStatefulFn() throws Exception {
 
@@ -212,13 +223,13 @@ public class DoFnOperatorTest {
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
-    DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<Integer, String, String> doFnOperator = new DoFnOperator<>(
         fn,
         "stepName",
         windowedValueCoder,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
-        new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<String>>(),
+        new DoFnOperator.DefaultOutputManagerFactory<String>(),
         windowingStrategy,
         new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
         Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -325,14 +336,14 @@ public class DoFnOperatorTest {
     TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
 
     DoFnOperator<
-        KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator =
+        KV<String, Integer>, KV<String, Integer>, KV<String, Integer>> doFnOperator =
         new DoFnOperator<>(
             fn,
             "stepName",
             windowedValueCoder,
             outputTag,
             Collections.<TupleTag<?>>emptyList(),
-            new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<String, Integer>>>(),
+            new DoFnOperator.DefaultOutputManagerFactory<KV<String, Integer>>(),
             windowingStrategy,
             new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
             Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -435,8 +446,8 @@ public class DoFnOperatorTest {
         PipelineOptionsFactory.as(FlinkPipelineOptions.class),
         keyCoder);
 
-    TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, String> testHarness =
-        new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+    TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>>
+        testHarness = new TwoInputStreamOperatorTestHarness<>(doFnOperator);
 
     if (keyed) {
       // we use a dummy key for the second input since it is considered to be broadcast
@@ -527,19 +538,19 @@ public class DoFnOperatorTest {
     });
   }
 
-  private Iterable<RawUnionValue> stripStreamRecordFromRawUnion(Iterable<Object> input) {
+  private Iterable<WindowedValue<String>> stripStreamRecord(Iterable<?> input) {
     return FluentIterable.from(input).filter(new Predicate<Object>() {
       @Override
       public boolean apply(@Nullable Object o) {
-        return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue;
+        return o instanceof StreamRecord;
       }
-    }).transform(new Function<Object, RawUnionValue>() {
+    }).transform(new Function<Object, WindowedValue<String>>() {
       @Nullable
       @Override
       @SuppressWarnings({"unchecked", "rawtypes"})
-      public RawUnionValue apply(@Nullable Object o) {
-        if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue) {
-          return (RawUnionValue) ((StreamRecord) o).getValue();
+      public WindowedValue<String> apply(@Nullable Object o) {
+        if (o instanceof StreamRecord) {
+          return (WindowedValue<String>) ((StreamRecord) o).getValue();
         }
         throw new RuntimeException("unreachable");
       }