You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:28 UTC

[3/6] beam git commit: Requires specifying a Coder on PCollection.createPrimitiveOutputInternal

Requires specifying a Coder on PCollection.createPrimitiveOutputInternal

The coder can still be null, in which case it is left unspecified.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bb1bf3c1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bb1bf3c1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bb1bf3c1

Branch: refs/heads/master
Commit: bb1bf3c19ca0baa2c04cec9863bfcaca2024f94e
Parents: e017a0e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 17:14:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |   5 +-
 .../FlattenPCollectionTranslator.java           |  15 +-
 .../apex/translation/ParDoTranslator.java       |   7 +-
 .../translation/ApexGroupByKeyOperatorTest.java |   6 +-
 .../construction/PCollectionTranslation.java    |   8 +-
 .../construction/PTransformReplacements.java    |   6 +
 .../core/construction/PrimitiveCreate.java      |  14 +-
 .../core/construction/SplittableParDo.java      |   3 +
 .../UnboundedReadFromBoundedSource.java         |   5 -
 .../construction/PTransformMatchersTest.java    |  79 +++++-----
 .../construction/ReplacementOutputsTest.java    |  14 +-
 .../core/GroupByKeyViaGroupByKeyOnly.java       |  15 +-
 .../core/SplittableParDoViaKeyedWorkItems.java  |   9 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  31 ++--
 .../direct/ParDoMultiOverrideFactory.java       |   3 +
 .../direct/TestStreamEvaluatorFactory.java      |   8 +-
 .../runners/direct/ViewOverrideFactory.java     |   5 +-
 .../runners/direct/CommittedResultTest.java     |  26 +++-
 .../runners/direct/EvaluationContextTest.java   |   8 +-
 .../runners/flink/CreateStreamingFlinkView.java |   5 +-
 .../beam/runners/dataflow/AssignWindows.java    |   4 +-
 .../runners/dataflow/BatchViewOverrides.java    |  19 +--
 .../runners/dataflow/CreateDataflowView.java    |   5 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 147 ++++++++++---------
 .../dataflow/PrimitiveParDoSingleFactory.java   |  12 +-
 .../DataflowPipelineTranslatorTest.java         |  11 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  11 +-
 .../transforms/DataflowGroupByKeyTest.java      |  12 +-
 .../dataflow/transforms/DataflowViewTest.java   |  14 +-
 .../beam/runners/spark/io/CreateStream.java     |  11 +-
 .../translation/StorageLevelPTransform.java     |  10 +-
 .../util/SinglePrimitiveOutputPTransform.java   |  51 -------
 .../main/java/org/apache/beam/sdk/io/Read.java  |  21 ++-
 .../org/apache/beam/sdk/testing/TestStream.java |   5 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  22 +--
 .../apache/beam/sdk/transforms/GroupByKey.java  |  12 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   2 +
 .../org/apache/beam/sdk/transforms/View.java    |   5 +-
 .../beam/sdk/transforms/windowing/Window.java   |   2 +-
 .../org/apache/beam/sdk/values/PCollection.java |   9 +-
 .../beam/sdk/values/PCollectionTuple.java       |  10 +-
 .../sdk/runners/TransformHierarchyTest.java     |  41 +++---
 .../beam/sdk/runners/TransformTreeTest.java     |  12 +-
 .../apache/beam/sdk/transforms/FlattenTest.java |   2 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |  11 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |  11 +-
 .../beam/sdk/values/PCollectionTupleTest.java   |   7 +-
 47 files changed, 357 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index fd0a1c9..cee524e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -227,9 +227,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
     @Override
     public PCollection<ElemT> expand(PCollection<ElemT> input) {
-      return PCollection.<ElemT>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
     }
 
     public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 440b801..189cb65 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -110,8 +110,12 @@ class FlattenPCollectionTranslator<T> implements
           }
 
           if (collections.size() > 2) {
-            PCollection<T> intermediateCollection = intermediateCollection(collection,
-                collection.getCoder());
+            PCollection<T> intermediateCollection =
+                PCollection.createPrimitiveOutputInternal(
+                    collection.getPipeline(),
+                    collection.getWindowingStrategy(),
+                    collection.isBounded(),
+                    collection.getCoder());
             context.addOperator(operator, operator.out, intermediateCollection);
             remainingCollections.add(intermediateCollection);
           } else {
@@ -135,11 +139,4 @@ class FlattenPCollectionTranslator<T> implements
     }
   }
 
-  static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
-    PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-        input.getWindowingStrategy(), input.isBounded());
-    output.setCoder(outputCoder);
-    return output;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index e46687a..be11b02 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -241,8 +241,11 @@ class ParDoTranslator<InputT, OutputT>
     }
 
     PCollection<Object> resultCollection =
-        FlattenPCollectionTranslator.intermediateCollection(
-            firstSideInput, firstSideInput.getCoder());
+        PCollection.createPrimitiveOutputInternal(
+            firstSideInput.getPipeline(),
+            firstSideInput.getWindowingStrategy(),
+            firstSideInput.isBounded(),
+            firstSideInput.getCoder());
     FlattenPCollectionTranslator.flattenCollections(
         sourceCollections, unionTags, resultCollection, context);
     return resultCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index 206b430..63a218b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -59,9 +59,9 @@ public class ApexGroupByKeyOperatorTest {
 
     WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of(
         Duration.standardSeconds(10)));
-    PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline,
-        ws, IsBounded.BOUNDED);
-    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+    PCollection<KV<String, Integer>> input =
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, ws, IsBounded.BOUNDED, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
 
     ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
         input, new ApexStateInternals.ApexStateBackend()

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index c0a5acf..c256e4c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -52,10 +52,10 @@ public class PCollectionTranslation {
 
     Coder<?> coder = components.getCoder(pCollection.getCoderId());
     return PCollection.createPrimitiveOutputInternal(
-            pipeline,
-            components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
-            fromProto(pCollection.getIsBounded()))
-        .setCoder((Coder) coder);
+        pipeline,
+        components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
+        fromProto(pCollection.getIsBounded()),
+        (Coder) coder);
   }
 
   public static IsBounded isBounded(RunnerApi.PCollection pCollection) {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
index 706a956..35bad15 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.Iterables;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -66,4 +67,9 @@ public class PTransformReplacements {
         ignoredTags);
     return mainInput;
   }
+
+  public static <T> PCollection<T> getSingletonMainOutput(
+      AppliedPTransform<?, PCollection<T>, ? extends PTransform<?, PCollection<T>>> transform) {
+    return ((PCollection<T>) Iterables.getOnlyElement(transform.getOutputs().values()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
index f43d23b..62b6d0a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
@@ -18,7 +18,9 @@
 
 package org.apache.beam.runners.core.construction;
 
+import com.google.common.collect.Iterables;
 import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -36,15 +38,17 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  */
 public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
   private final Create.Values<T> transform;
+  private final Coder<T> coder;
 
-  private PrimitiveCreate(Create.Values<T> transform) {
+  private PrimitiveCreate(Create.Values<T> transform, Coder<T> coder) {
     this.transform = transform;
+    this.coder = coder;
   }
 
   @Override
   public PCollection<T> expand(PBegin input) {
     return PCollection.createPrimitiveOutputInternal(
-        input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+        input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED, coder);
   }
 
   public Iterable<T> getElements() {
@@ -60,7 +64,11 @@ public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
     public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
         AppliedPTransform<PBegin, PCollection<T>, Values<T>> transform) {
       return PTransformReplacement.of(
-          transform.getPipeline().begin(), new PrimitiveCreate<T>(transform.getTransform()));
+          transform.getPipeline().begin(),
+          new PrimitiveCreate<T>(
+              transform.getTransform(),
+              ((PCollection<T>) Iterables.getOnlyElement(transform.getOutputs().values()))
+                  .getCoder()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index e71187b..bcc5de8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -273,6 +274,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
           PCollectionTuple.ofPrimitiveOutputsInternal(
               input.getPipeline(),
               TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
+              // TODO
+              Collections.<TupleTag<?>, Coder<?>>emptyMap(),
               windowingStrategy,
               input.isBounded().and(signature.isBoundedPerElement()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index f35f4c3..55f9519 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -91,11 +91,6 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
   }
 
   @Override
-  protected Coder<T> getDefaultOutputCoder() {
-    return source.getOutputCoder();
-  }
-
-  @Override
   public String getKindString() {
     return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 1862699..fa7e1e9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -100,15 +100,16 @@ public class PTransformMatchersTest implements Serializable {
   private AppliedPTransform<?, ?, ?> getAppliedTransform(PTransform pardo) {
     PCollection<KV<String, Integer>> input =
         PCollection.createPrimitiveOutputInternal(
-            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            p,
+            WindowingStrategy.globalDefault(),
+            IsBounded.BOUNDED,
+            KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
     input.setName("dummy input");
-    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
 
     PCollection<Integer> output =
         PCollection.createPrimitiveOutputInternal(
-            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
     output.setName("dummy output");
-    output.setCoder(VarIntCoder.of());
 
     return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
   }
@@ -133,7 +134,7 @@ public class PTransformMatchersTest implements Serializable {
       @Override
       public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
         return PCollection.createPrimitiveOutputInternal(
-            input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+            input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), VarIntCoder.of());
       }
     }
     PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
@@ -425,14 +426,14 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+            .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
                 "EmptyFlatten",
                 Collections.<TupleTag<?>, PValue>emptyMap(),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.pCollections(),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                Flatten.<Integer>pCollections(),
                 p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
@@ -442,17 +443,17 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+            .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
                 "Flatten",
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.pCollections(),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                Flatten.<Integer>pCollections(),
                 p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
@@ -462,15 +463,15 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+            .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
                 "EmptyFlatten",
                 Collections.<TupleTag<?>, PValue>emptyMap(),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.iterables() /* This isn't actually possible to construct,
-                                 * but for the sake of example */,
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                /* This isn't actually possible to construct, but for the sake of example */
+                Flatten.<Integer>iterables(),
                 p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
@@ -480,17 +481,17 @@ public class PTransformMatchersTest implements Serializable {
   public void flattenWithDuplicateInputsWithoutDuplicates() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+            .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
                 "Flatten",
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.pCollections(),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                Flatten.<Integer>pCollections(),
                 p);
 
     assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
@@ -498,22 +499,22 @@ public class PTransformMatchersTest implements Serializable {
 
   @Test
   public void flattenWithDuplicateInputsWithDuplicates() {
-    PCollection<Object> duplicate =
+    PCollection<Integer> duplicate =
         PCollection.createPrimitiveOutputInternal(
-            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+            .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
                 "Flatten",
                 ImmutableMap.<TupleTag<?>, PValue>builder()
-                    .put(new TupleTag<Object>(), duplicate)
-                    .put(new TupleTag<Object>(), duplicate)
+                    .put(new TupleTag<Integer>(), duplicate)
+                    .put(new TupleTag<Integer>(), duplicate)
                     .build(),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.pCollections(),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                Flatten.<Integer>pCollections(),
                 p);
 
     assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(true));
@@ -523,15 +524,15 @@ public class PTransformMatchersTest implements Serializable {
   public void flattenWithDuplicateInputsNonFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+            .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
                 "EmptyFlatten",
                 Collections.<TupleTag<?>, PValue>emptyMap(),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.iterables() /* This isn't actually possible to construct,
-                                 * but for the sake of example */,
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                /* This isn't actually possible to construct, but for the sake of example */
+                Flatten.<Integer>iterables(),
                 p);
 
     assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
index f8d01e9..0165e4b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.util.Map;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -50,23 +52,23 @@ public class ReplacementOutputsTest {
 
   private PCollection<Integer> ints =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
   private PCollection<Integer> moreInts =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
   private PCollection<String> strs =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
 
   private PCollection<Integer> replacementInts =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
   private PCollection<Integer> moreReplacementInts =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
   private PCollection<String> replacementStrs =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
 
   @Test
   public void singletonSucceeds() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index fca3c76..1fdf07c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -111,12 +111,10 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
     @Override
     public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-
-    @Override
-    public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-      return GroupByKey.getOutputKvCoder(input.getCoder());
+          input.getPipeline(),
+          input.getWindowingStrategy(),
+          input.isBounded(),
+          (Coder) GroupByKey.getOutputKvCoder(input.getCoder()));
     }
   }
 
@@ -244,9 +242,8 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
       Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
       Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
 
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), windowingStrategy, input.isBounded())
-          .setCoder(outputKvCoder);
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), windowingStrategy, input.isBounded(), outputKvCoder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 6e97645..af720fd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -72,8 +72,15 @@ public class SplittableParDoViaKeyedWorkItems {
           PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
     @Override
     public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
+      KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          input.isBounded(),
+          KeyedWorkItemCoder.of(
+              kvCoder.getKeyCoder(),
+              kvCoder.getValueCoder(),
+              input.getWindowingStrategy().getWindowFn().windowCoder()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 06b8e29..3ba04e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -78,22 +77,18 @@ class DirectGroupByKey<K, V>
     @Override
     public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          input.isBounded(),
+          KeyedWorkItemCoder.of(
+              GroupByKey.getKeyCoder(input.getCoder()),
+              GroupByKey.getInputValueCoder(input.getCoder()),
+              input.getWindowingStrategy().getWindowFn().windowCoder()));
     }
 
     DirectGroupByKeyOnly() {}
 
     @Override
-    protected Coder<?> getDefaultOutputCoder(
-        @SuppressWarnings("unused") PCollection<KV<K, V>> input)
-        throws CannotProvideCoderException {
-      return KeyedWorkItemCoder.of(
-          GroupByKey.getKeyCoder(input.getCoder()),
-          GroupByKey.getInputValueCoder(input.getCoder()),
-          input.getWindowingStrategy().getWindowFn().windowCoder());
-    }
-
-    @Override
     public String getUrn() {
       return DIRECT_GBKO_URN;
     }
@@ -135,17 +130,11 @@ class DirectGroupByKey<K, V>
     }
 
     @Override
-    protected Coder<?> getDefaultOutputCoder(
-        @SuppressWarnings("unused") PCollection<KeyedWorkItem<K, V>> input)
-        throws CannotProvideCoderException {
-      KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
-      return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()));
-    }
-
-    @Override
     public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
+      KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), outputWindowingStrategy, input.isBounded());
+          input.getPipeline(), outputWindowingStrategy, input.isBounded(),
+          KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder())));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 891d102..3f04b56 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 import static com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
@@ -248,6 +249,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
           PCollectionTuple.ofPrimitiveOutputsInternal(
               input.getPipeline(),
               TupleTagList.of(getMainOutputTag()).and(getAdditionalOutputTags().getAll()),
+              // TODO
+              Collections.<TupleTag<?>, Coder<?>>emptyMap(),
               input.getWindowingStrategy(),
               input.isBounded());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 16c8589..49e7be7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -207,9 +207,11 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
       @Override
       public PCollection<T> expand(PBegin input) {
         runner.setClockSupplier(new TestClockSupplier());
-        return PCollection.<T>createPrimitiveOutputInternal(
-                input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(original.getValueCoder());
+        return PCollection.createPrimitiveOutputInternal(
+            input.getPipeline(),
+            WindowingStrategy.globalDefault(),
+            IsBounded.UNBOUNDED,
+            original.getValueCoder());
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 5dcf016..c2255fe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -115,9 +115,8 @@ class ViewOverrideFactory<ElemT, ViewT>
     @Override
     @SuppressWarnings("deprecation")
     public PCollection<Iterable<ElemT>> expand(PCollection<Iterable<ElemT>> input) {
-      return PCollection.<Iterable<ElemT>>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
     }
 
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 8b95b34..29ed55d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -113,13 +114,24 @@ public class CommittedResultTest implements Serializable {
 
   @Test
   public void getOutputsEqualInput() {
-    List<? extends CommittedBundle<?>> outputs =
-        ImmutableList.of(bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
-            WindowingStrategy.globalDefault(),
-            PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
-            bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
-                WindowingStrategy.globalDefault(),
-                PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
+    List<? extends CommittedBundle<Integer>> outputs =
+        ImmutableList.of(
+            bundleFactory
+                .createBundle(
+                    PCollection.createPrimitiveOutputInternal(
+                        p,
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.BOUNDED,
+                        VarIntCoder.of()))
+                .commit(Instant.now()),
+            bundleFactory
+                .createBundle(
+                    PCollection.createPrimitiveOutputInternal(
+                        p,
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.UNBOUNDED,
+                        VarIntCoder.of()))
+                .commit(Instant.now()));
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 699a318..cc9ce60 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
@@ -127,8 +128,11 @@ public class EvaluationContextTest {
   public void writeToViewWriterThenReadReads() {
     PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
         context.createPCollectionViewWriter(
-            PCollection.<Iterable<Integer>>createPrimitiveOutputInternal(
-                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED),
+            PCollection.createPrimitiveOutputInternal(
+                p,
+                WindowingStrategy.globalDefault(),
+                IsBounded.BOUNDED,
+                IterableCoder.of(VarIntCoder.of())),
             view);
     BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
     BoundedWindow second = new TestBoundedWindow(new Instant(899999L));

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index 0cc3aec..3114a6f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -120,9 +120,8 @@ class CreateStreamingFlinkView<ElemT, ViewT>
 
     @Override
     public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
-      return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
     }
 
     public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index 572b005..d015d2b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -59,8 +59,8 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
         transform.getOutputStrategyInternal(input.getWindowingStrategy());
     if (transform.getWindowFn() != null) {
       // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
-      return PCollection.<T>createPrimitiveOutputInternal(
-                            input.getPipeline(), outputStrategy, input.isBounded());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), outputStrategy, input.isBounded(), input.getCoder());
     } else {
       // If the windowFn didn't change, we just run a pass-through transform and then set the
       // new windowing strategy.

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ad3faed..9a77b4b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1258,18 +1258,13 @@ class BatchViewOverrides {
 
     @Override
     public PCollection<KV<K1, Iterable<KV<K2, V>>>> expand(PCollection<KV<K1, KV<K2, V>>> input) {
-      PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
-          PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal(
-              input.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              IsBounded.BOUNDED);
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder) input.getCoder();
-      rval.setCoder(
-          KvCoder.of(inputCoder.getKeyCoder(),
-              IterableCoder.of(inputCoder.getValueCoder())));
-      return rval;
+      @SuppressWarnings("unchecked")
+      KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder<K1, KV<K2, V>>) input.getCoder();
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.BOUNDED,
+          KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index caad7f8..3b01d69 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -37,9 +37,8 @@ public class CreateDataflowView<ElemT, ViewT>
 
   @Override
   public PCollection<ElemT> expand(PCollection<ElemT> input) {
-    return PCollection.<ElemT>createPrimitiveOutputInternal(
-            input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-        .setCoder(input.getCoder());
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
   }
 
   public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8fce5b4..6999616 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -321,7 +321,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         overridesBuilder.add(
             PTransformOverride.of(
                 PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
-                new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this)));
+                new StreamingPubsubIOReadOverrideFactory()));
       }
       if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
         overridesBuilder.add(
@@ -359,11 +359,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               // must precede it
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(Read.Bounded.class),
-                  new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this)))
+                  new StreamingBoundedReadOverrideFactory()))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(Read.Unbounded.class),
-                  new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this)))
+                  new StreamingUnboundedReadOverrideFactory()))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
@@ -448,38 +448,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  private static class ReflectiveRootOverrideFactory<T>
-      implements PTransformOverrideFactory<
-          PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> {
-    private final Class<PTransform<PBegin, PCollection<T>>> replacement;
-    private final DataflowRunner runner;
-
-    private ReflectiveRootOverrideFactory(
-        Class<PTransform<PBegin, PCollection<T>>> replacement, DataflowRunner runner) {
-      this.replacement = replacement;
-      this.runner = runner;
-    }
-
-    @Override
-    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
-      PTransform<PInput, PCollection<T>> original = transform.getTransform();
-      return PTransformReplacement.of(
-          transform.getPipeline().begin(),
-          InstanceBuilder.ofType(replacement)
-              .withArg(DataflowRunner.class, runner)
-              .withArg(
-                  (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
-              .build());
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
-      return ReplacementOutputs.singleton(outputs, newOutput);
-    }
-  }
-
   private String debuggerMessage(String projectId, String uniquifier) {
     return String.format("To debug your job, visit Google Cloud Debugger at: "
         + "https://console.developers.google.com/debug?project=%s&dbgee=%s",
@@ -838,6 +806,24 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // PubsubIO translations
   // ================================================================================
 
+  private static class StreamingPubsubIOReadOverrideFactory
+      implements PTransformOverrideFactory<
+          PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> {
+    @Override
+    public PTransformReplacement<PBegin, PCollection<PubsubMessage>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new StreamingPubsubIORead(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<PubsubMessage> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+
   /**
    * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can
    * instead defer to Windmill's implementation.
@@ -846,9 +832,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       extends PTransform<PBegin, PCollection<PubsubMessage>> {
     private final PubsubUnboundedSource transform;
 
-    /** Builds an instance of this class from the overridden transform. */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingPubsubIORead(DataflowRunner runner, PubsubUnboundedSource transform) {
+    public StreamingPubsubIORead(PubsubUnboundedSource transform) {
       this.transform = transform;
     }
 
@@ -858,9 +842,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public PCollection<PubsubMessage> expand(PBegin input) {
-      return PCollection.<PubsubMessage>createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-          .setCoder(new PubsubMessageWithAttributesCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.UNBOUNDED,
+          new PubsubMessageWithAttributesCoder());
     }
 
     @Override
@@ -1129,12 +1115,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     @Override
     public PCollection<byte[]> expand(PBegin input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), isBounded);
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return ByteArrayCoder.of();
+          input.getPipeline(), WindowingStrategy.globalDefault(), isBounded, ByteArrayCoder.of());
     }
 
     private static class Translator implements TransformTranslator<Impulse> {
@@ -1157,6 +1138,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
+  private static class StreamingUnboundedReadOverrideFactory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Unbounded<T>> {
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, Read.Unbounded<T>> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new StreamingUnboundedRead<>(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
   /**
    * Specialized implementation for
    * {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
@@ -1168,18 +1165,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static class StreamingUnboundedRead<T> extends PTransform<PBegin, PCollection<T>> {
     private final UnboundedSource<T, ?> source;
 
-    /** Builds an instance of this class from the overridden transform. */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
+    public StreamingUnboundedRead(Read.Unbounded<T> transform) {
       this.source = transform.getSource();
     }
 
     @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getOutputCoder();
-    }
-
-    @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
@@ -1206,13 +1196,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @Override
       public final PCollection<ValueWithRecordId<T>> expand(PInput input) {
-        return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal(
-            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
-      }
-
-      @Override
-      protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
-        return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder());
+        return PCollection.createPrimitiveOutputInternal(
+            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED,
+            ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder()));
       }
 
       @Override
@@ -1276,6 +1262,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
+  private static class StreamingBoundedReadOverrideFactory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Bounded<T>> {
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new StreamingBoundedRead<>(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
   /**
    * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the
    * Dataflow runner in streaming mode.
@@ -1283,18 +1285,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static class StreamingBoundedRead<T> extends PTransform<PBegin, PCollection<T>> {
     private final BoundedSource<T> source;
 
-    /** Builds an instance of this class from the overridden transform. */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
+    public StreamingBoundedRead(Read.Bounded<T> transform) {
       this.source = transform.getSource();
     }
 
     @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getOutputCoder();
-    }
-
-    @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
@@ -1404,15 +1399,19 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   static class CombineGroupedValues<K, InputT, OutputT>
       extends PTransform<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
     private final Combine.GroupedValues<K, InputT, OutputT> original;
+    private final Coder<KV<K, OutputT>> outputCoder;
 
-    CombineGroupedValues(GroupedValues<K, InputT, OutputT> original) {
+    CombineGroupedValues(
+        GroupedValues<K, InputT, OutputT> original, Coder<KV<K, OutputT>> outputCoder) {
       this.original = original;
+      this.outputCoder = outputCoder;
     }
 
     @Override
     public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<InputT>>> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(),
+          outputCoder);
     }
 
     public Combine.GroupedValues<K, InputT, OutputT> getOriginalCombine() {
@@ -1433,7 +1432,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new CombineGroupedValues<>(transform.getTransform()));
+          new CombineGroupedValues<>(
+              transform.getTransform(),
+              PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 8611d3c..9252c64 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.DisplayData;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -49,7 +50,9 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
               transform) {
     return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
-        new ParDoSingle<>(transform.getTransform()));
+        new ParDoSingle<>(
+            transform.getTransform(),
+            PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
   }
 
   /**
@@ -58,15 +61,18 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
   public static class ParDoSingle<InputT, OutputT>
       extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
     private final ParDo.SingleOutput<InputT, OutputT> original;
+    private final Coder<OutputT> outputCoder;
 
-    private ParDoSingle(ParDo.SingleOutput<InputT, OutputT> original) {
+    private ParDoSingle(SingleOutput<InputT, OutputT> original, Coder<OutputT> outputCoder) {
       this.original = original;
+      this.outputCoder = outputCoder;
     }
 
     @Override
     public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(),
+          outputCoder);
     }
 
     public DoFn<InputT, OutputT> getFn() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 9a0bdf8..7a99f75 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -650,10 +650,13 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
       // Fails here when attempting to construct a tuple with an unbound object.
       return PCollectionTuple.of(sumTag, sum)
-          .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
-              input.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              input.isBounded()));
+          .and(
+              doneTag,
+              PCollection.createPrimitiveOutputInternal(
+                  input.getPipeline(),
+                  WindowingStrategy.globalDefault(),
+                  input.isBounded(),
+                  VoidCoder.of()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 9db73c6..55264a1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -72,7 +72,6 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOption
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -953,15 +952,11 @@ public class DataflowRunnerTest implements Serializable {
 
     @Override
     public PCollection<Integer> expand(PCollection<Integer> input) {
-      return PCollection.<Integer>createPrimitiveOutputInternal(
+      return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(),
           WindowingStrategy.globalDefault(),
-          input.isBounded());
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder(PCollection<Integer> input) {
-      return input.getCoder();
+          input.isBounded(),
+          input.getCoder());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index 737b408..c198ebf 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
 import org.junit.Before;
@@ -105,11 +105,11 @@ public class DataflowGroupByKeyTest {
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
               public PCollection<KV<String, Integer>> expand(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+                return PCollection.createPrimitiveOutputInternal(
+                    input.getPipeline(),
+                    WindowingStrategy.globalDefault(),
+                    PCollection.IsBounded.UNBOUNDED,
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
               }
             });
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index dea96b9..e2e42a6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -21,6 +21,9 @@ import com.google.api.services.dataflow.Dataflow;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -33,7 +36,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
@@ -94,11 +96,11 @@ public class DataflowViewTest {
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
               public PCollection<KV<String, Integer>> expand(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+                return PCollection.createPrimitiveOutputInternal(
+                    input.getPipeline(),
+                    WindowingStrategy.globalDefault(),
+                    PCollection.IsBounded.UNBOUNDED,
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
               }
             })
         .apply(view);

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index fdcea99..d485d25 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -203,11 +202,9 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
   @Override
   public PCollection<T> expand(PBegin input) {
     return PCollection.createPrimitiveOutputInternal(
-        input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
-  }
-
-  @Override
-  protected Coder<T> getDefaultOutputCoder() throws CannotProvideCoderException {
-    return coder;
+        input.getPipeline(),
+        WindowingStrategy.globalDefault(),
+        PCollection.IsBounded.UNBOUNDED,
+        coder);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
index 0ecfa75..b236ce7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.spark.translation;
 
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -32,12 +31,7 @@ public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCo
   public PCollection<String> expand(PCollection<?> input) {
     return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
         WindowingStrategy.globalDefault(),
-        PCollection.IsBounded.BOUNDED);
+        PCollection.IsBounded.BOUNDED,
+        StringUtf8Coder.of());
   }
-
-  @Override
-  public Coder getDefaultOutputCoder() {
-    return StringUtf8Coder.of();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
deleted file mode 100644
index 299f5ba..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.util;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * A {@link PTransform} wrapping another transform.
- */
-public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> {
-  private PTransform<PInput, PCollection<T>> transform;
-
-  public SinglePrimitiveOutputPTransform(PTransform<PInput, PCollection<T>> transform) {
-    this.transform = transform;
-  }
-
-  @Override
-  public PCollection<T> expand(PInput input) {
-    try {
-      PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
-              input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-      collection.setCoder(transform.getDefaultOutputCoder(input, collection));
-      return collection;
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException(
-          "Unable to infer a coder and no Coder was specified. "
-              + "Please set a coder by invoking Create.withCoder() explicitly.",
-          e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 6e6750d..574ba0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -95,17 +95,14 @@ public class Read {
     }
 
     @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getOutputCoder();
-    }
-
-    @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
-      return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
-          WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-          .setCoder(getDefaultOutputCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.BOUNDED,
+          source.getOutputCoder());
     }
 
     /**
@@ -170,9 +167,11 @@ public class Read {
     @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
-
-      return PCollection.<T>createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.UNBOUNDED,
+          source.getOutputCoder());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index d13fcf1..45f4413 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -253,9 +253,8 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
 
   @Override
   public PCollection<T> expand(PBegin input) {
-    return PCollection.<T>createPrimitiveOutputInternal(
-            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-        .setCoder(coder);
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder);
   }
 
   public Coder<T> getValueCoder() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 25d9c05..8247a58 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableLikeCoder;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -129,25 +128,12 @@ public class Flatten {
         windowingStrategy = WindowingStrategy.globalDefault();
       }
 
-      return PCollection.<T>createPrimitiveOutputInternal(
+      return PCollection.createPrimitiveOutputInternal(
           inputs.getPipeline(),
           windowingStrategy,
-          isBounded);
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder(PCollectionList<T> input)
-        throws CannotProvideCoderException {
-
-      // Take coder from first collection
-      for (PCollection<T> pCollection : input.getAll()) {
-        return pCollection.getCoder();
-      }
-
-      // No inputs
-      throw new CannotProvideCoderException(
-          this.getClass().getSimpleName() + " cannot provide a Coder for"
-          + " empty " + PCollectionList.class.getSimpleName());
+          isBounded,
+          // Take coder from first collection. If there are none, will be left unspecified.
+          inputs.getAll().isEmpty() ? null : inputs.get(0).getCoder());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 7516b25..3cb0d23 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -217,13 +217,11 @@ public class GroupByKey<K, V>
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the
     // window function associated with the input PCollection.
-    return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-        updateWindowingStrategy(input.getWindowingStrategy()), input.isBounded());
-  }
-
-  @Override
-  protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-    return getOutputKvCoder(input.getCoder());
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(),
+        updateWindowingStrategy(input.getWindowingStrategy()),
+        input.isBounded(),
+        getOutputKvCoder(input.getCoder()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 0d03835..bc4f629 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -767,6 +767,8 @@ public class ParDo {
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),
           TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
+          // TODO
+          Collections.<TupleTag<?>, Coder<?>>emptyMap(),
           input.getWindowingStrategy(),
           input.isBounded());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 57dccbc..f6f3af5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -509,9 +509,8 @@ public class View {
 
     @Override
     public PCollection<ElemT> expand(PCollection<ElemT> input) {
-      return PCollection.<ElemT>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index a12be6d..af583e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -484,7 +484,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), updatedStrategy, input.isBounded());
+          input.getPipeline(), updatedStrategy, input.isBounded(), input.getCoder());
     }
 
     @Override