You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:26 UTC

[1/6] beam git commit: Removes all overrides of PTransform.getDefaultOutputCoder and deprecates it

Repository: beam
Updated Branches:
  refs/heads/master 38f189063 -> 9e6530adb


Removes all overrides of PTransform.getDefaultOutputCoder and deprecates it


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95e2a00a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95e2a00a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95e2a00a

Branch: refs/heads/master
Commit: 95e2a00a807caaf3b4a9532e29dc38fd9d32e700
Parents: bb1bf3c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 16:33:36 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/complete/TfIdf.java    | 14 ++--
 .../core/construction/ForwardingPTransform.java | 18 ++---
 .../construction/ForwardingPTransformTest.java  | 17 ++++-
 .../beam/runners/direct/MultiStepCombine.java   | 10 +--
 .../beam/runners/dataflow/AssignWindows.java    |  8 +--
 .../DataflowPipelineTranslatorTest.java         | 10 ---
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  6 --
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  8 +--
 .../main/java/org/apache/beam/sdk/io/Read.java  |  6 --
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 18 +----
 .../java/org/apache/beam/sdk/io/TextIO.java     | 12 ----
 .../org/apache/beam/sdk/transforms/Combine.java | 22 ++----
 .../org/apache/beam/sdk/transforms/Create.java  | 70 +++++++++-----------
 .../org/apache/beam/sdk/transforms/Filter.java  | 26 ++++----
 .../apache/beam/sdk/transforms/PTransform.java  | 13 +++-
 .../org/apache/beam/sdk/transforms/ParDo.java   | 59 ++++++++---------
 .../beam/sdk/transforms/windowing/Window.java   |  6 --
 .../beam/sdk/runners/TransformTreeTest.java     |  7 --
 .../apache/beam/sdk/transforms/CreateTest.java  | 27 +++-----
 .../beam/sdk/extensions/sorter/SortValues.java  | 20 +++---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 18 +----
 .../io/gcp/bigquery/PassThroughThenCleanup.java |  2 +-
 .../sdk/io/gcp/bigquery/StreamingInserts.java   |  8 ---
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 13 +---
 24 files changed, 149 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 435ffab..cfc413c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -25,7 +25,6 @@ import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringDelegateCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -155,11 +154,6 @@ public class TfIdf {
     }
 
     @Override
-    public Coder<?> getDefaultOutputCoder() {
-      return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
-    }
-
-    @Override
     public PCollection<KV<URI, String>> expand(PBegin input) {
       Pipeline pipeline = input.getPipeline();
 
@@ -179,9 +173,11 @@ public class TfIdf {
           uriString = uri.toString();
         }
 
-        PCollection<KV<URI, String>> oneUriToLines = pipeline
-            .apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
-            .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
+        PCollection<KV<URI, String>> oneUriToLines =
+            pipeline
+                .apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
+                .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri))
+                .setCoder(KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()));
 
         urisToLines = urisToLines.and(oneUriToLines);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
index ca25ba7..ccf41f3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.core.construction;
 
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -37,7 +36,16 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
 
   @Override
   public OutputT expand(InputT input) {
-    return delegate().expand(input);
+    OutputT res = delegate().expand(input);
+    if (res instanceof PCollection) {
+      PCollection pc = (PCollection) res;
+      try {
+        pc.setCoder(delegate().getDefaultOutputCoder(input, pc));
+      } catch (CannotProvideCoderException e) {
+        // Let coder inference happen later.
+      }
+    }
+    return res;
   }
 
   @Override
@@ -51,12 +59,6 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
   }
 
   @Override
-  public <T> Coder<T> getDefaultOutputCoder(InputT input, PCollection<T> output)
-      throws CannotProvideCoderException {
-    return delegate().getDefaultOutputCoder(input, output);
-  }
-
-  @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     builder.delegate(delegate());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
index 74c056c..4741b6b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -90,14 +91,24 @@ public class ForwardingPTransformTest {
   @Test
   public void getDefaultOutputCoderDelegates() throws Exception {
     @SuppressWarnings("unchecked")
-    PCollection<Integer> input = Mockito.mock(PCollection.class);
+    PCollection<Integer> input =
+        PCollection.createPrimitiveOutputInternal(
+            null /* pipeline */,
+            WindowingStrategy.globalDefault(),
+            PCollection.IsBounded.BOUNDED,
+            null /* coder */);
     @SuppressWarnings("unchecked")
-    PCollection<String> output = Mockito.mock(PCollection.class);
+    PCollection<String> output = PCollection.createPrimitiveOutputInternal(
+        null /* pipeline */,
+        WindowingStrategy.globalDefault(),
+        PCollection.IsBounded.BOUNDED,
+        null /* coder */);
     @SuppressWarnings("unchecked")
     Coder<String> outputCoder = Mockito.mock(Coder.class);
 
+    Mockito.when(delegate.expand(input)).thenReturn(output);
     Mockito.when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder);
-    assertThat(forwarding.getDefaultOutputCoder(input, output), equalTo(outputCoder));
+    assertThat(forwarding.expand(input).getCoder(), equalTo(outputCoder));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index 6f49e94..ae21b4d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -213,8 +213,7 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
                     inputCoder.getKeyCoder())))
         .setCoder(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder))
         .apply(GroupByKey.<K, AccumT>create())
-        .apply(new MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>(combineFn))
-        .setCoder(outputCoder);
+        .apply(new MergeAndExtractAccumulatorOutput<>(combineFn, outputCoder));
   }
 
   private static class CombineInputs<K, InputT, AccumT> extends DoFn<KV<K, InputT>, KV<K, AccumT>> {
@@ -320,9 +319,12 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
   static class MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>
       extends RawPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>> {
     private final CombineFn<?, AccumT, OutputT> combineFn;
+    private final Coder<KV<K, OutputT>> outputCoder;
 
-    private MergeAndExtractAccumulatorOutput(CombineFn<?, AccumT, OutputT> combineFn) {
+    private MergeAndExtractAccumulatorOutput(
+        CombineFn<?, AccumT, OutputT> combineFn, Coder<KV<K, OutputT>> outputCoder) {
       this.combineFn = combineFn;
+      this.outputCoder = outputCoder;
     }
 
     CombineFn<?, AccumT, OutputT> getCombineFn() {
@@ -332,7 +334,7 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
     @Override
     public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<AccumT>>> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), outputCoder);
     }
 
     @Nullable

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index d015d2b..7d1dadb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.dataflow;
 
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -69,7 +68,7 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
         public void processElement(ProcessContext c) throws Exception {
           c.output(c.element());
         }
-      })).setWindowingStrategyInternal(outputStrategy);
+      })).setWindowingStrategyInternal(outputStrategy).setCoder(input.getCoder());
     }
   }
 
@@ -79,11 +78,6 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
   }
 
   @Override
-  protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
-    return input.getCoder();
-  }
-
-  @Override
   protected String getKindString() {
     return "Window.Into()";
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 7a99f75..f756065 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -606,11 +606,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
       // Return a value unrelated to the input.
       return input.getPipeline().apply(Create.of(1, 2, 3, 4));
     }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return VarIntCoder.of();
-    }
   }
 
   /**
@@ -626,11 +621,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
       return PDone.in(input.getPipeline());
     }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 824f725..cd5857c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -996,11 +995,6 @@ public class AvroIO {
               DisplayData.item("tempDirectory", tempDirectory)
                   .withLabel("Directory for temporary files"));
     }
-
-    @Override
-    protected Coder<Void> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 8505ca4..80a03eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -114,12 +114,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
             }
           }));
     }
-    return read.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()));
-  }
-
-  @Override
-  protected Coder<T> getDefaultOutputCoder() {
-    return source.getOutputCoder();
+    return read.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()))
+        .setCoder(source.getOutputCoder());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 574ba0c..9b273f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io;
 
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.NameUtils;
@@ -160,11 +159,6 @@ public class Read {
     }
 
     @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getOutputCoder();
-    }
-
-    @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
       return PCollection.createPrimitiveOutputInternal(

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 1b2e95b..c75051f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -35,8 +35,6 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -171,11 +169,7 @@ public class TFRecordIO {
         }
       }
 
-      final Bounded<byte[]> read = org.apache.beam.sdk.io.Read.from(getSource());
-      PCollection<byte[]> pcol = input.getPipeline().apply("Read", read);
-      // Honor the default output coder that would have been used by this PTransform.
-      pcol.setCoder(getDefaultOutputCoder());
-      return pcol;
+      return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
     }
 
     // Helper to create a source specific to the requested compression type.
@@ -212,11 +206,6 @@ public class TFRecordIO {
           .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
               .withLabel("File Pattern"));
     }
-
-    @Override
-    protected Coder<byte[]> getDefaultOutputCoder() {
-      return ByteArrayCoder.of();
-    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -391,11 +380,6 @@ public class TFRecordIO {
           .add(DisplayData.item("compressionType", getCompressionType().toString())
               .withLabel("Compression Type"));
     }
-
-    @Override
-    protected Coder<Void> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 312dc07..9a14ad9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -30,9 +30,7 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
@@ -337,11 +335,6 @@ public class TextIO {
           .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
             .withLabel("File Pattern"));
     }
-
-    @Override
-    protected Coder<String> getDefaultOutputCoder() {
-      return StringUtf8Coder.of();
-    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -813,11 +806,6 @@ public class TextIO {
                       "writableByteChannelFactory", getWritableByteChannelFactory().toString())
                   .withLabel("Compression/Transformation Type"));
     }
-
-    @Override
-    protected Coder<Void> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index c195352..fab98f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2156,8 +2156,13 @@ public class Combine {
           }).withSideInputs(sideInputs));
 
       try {
-        Coder<KV<K, OutputT>> outputCoder = getDefaultOutputCoder(input);
-        output.setCoder(outputCoder);
+        KvCoder<K, InputT> kvCoder = getKvCoder(input.getCoder());
+        @SuppressWarnings("unchecked")
+        Coder<OutputT> outputValueCoder =
+            ((GlobalCombineFn<InputT, ?, OutputT>) fn)
+                .getDefaultOutputCoder(
+                    input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
+        output.setCoder(KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder));
       } catch (CannotProvideCoderException exc) {
         // let coder inference happen later, if it can
       }
@@ -2200,19 +2205,6 @@ public class Combine {
     }
 
     @Override
-    public Coder<KV<K, OutputT>> getDefaultOutputCoder(
-        PCollection<? extends KV<K, ? extends Iterable<InputT>>> input)
-        throws CannotProvideCoderException {
-      KvCoder<K, InputT> kvCoder = getKvCoder(input.getCoder());
-      @SuppressWarnings("unchecked")
-      Coder<OutputT> outputValueCoder =
-          ((GlobalCombineFn<InputT, ?, OutputT>) fn)
-              .getDefaultOutputCoder(
-                  input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
-      return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder);
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       Combine.populateDisplayData(builder, fn, fnDisplayData);

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index a28e9b2..2635bc8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -315,29 +315,25 @@ public class Create<T> {
 
     @Override
     public PCollection<T> expand(PBegin input) {
+      Coder<T> coder;
       try {
-        Coder<T> coder = getDefaultOutputCoder(input);
-        try {
-          CreateSource<T> source = CreateSource.fromIterable(elems, coder);
-          return input.getPipeline().apply(Read.from(source));
-        } catch (IOException e) {
-          throw new RuntimeException(
-              String.format("Unable to apply Create %s using Coder %s.", this, coder), e);
-        }
+        CoderRegistry registry = input.getPipeline().getCoderRegistry();
+        coder =
+            this.coder.isPresent()
+                ? this.coder.get()
+                : typeDescriptor.isPresent()
+                    ? registry.getCoder(typeDescriptor.get())
+                    : getDefaultCreateCoder(registry, elems);
       } catch (CannotProvideCoderException e) {
         throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
             + "Please set a coder by invoking Create.withCoder() explicitly.", e);
       }
-    }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder(PBegin input) throws CannotProvideCoderException {
-      if (coder.isPresent()) {
-        return coder.get();
-      } else if (typeDescriptor.isPresent()) {
-        return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
-      } else {
-        return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems);
+      try {
+        CreateSource<T> source = CreateSource.fromIterable(elems, coder);
+        return input.getPipeline().apply(Read.from(source));
+      } catch (IOException e) {
+        throw new RuntimeException(
+            String.format("Unable to apply Create %s using Coder %s.", this, coder), e);
       }
     }
 
@@ -570,7 +566,23 @@ public class Create<T> {
     @Override
     public PCollection<T> expand(PBegin input) {
       try {
-        Coder<T> coder = getDefaultOutputCoder(input);
+        Coder<T> coder;
+        if (elementCoder.isPresent()) {
+          coder = elementCoder.get();
+        } else if (typeDescriptor.isPresent()) {
+          coder = input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
+        } else {
+          Iterable<T> rawElements =
+              Iterables.transform(
+                  timestampedElements,
+                  new Function<TimestampedValue<T>, T>() {
+                    @Override
+                    public T apply(TimestampedValue<T> timestampedValue) {
+                      return timestampedValue.getValue();
+                    }
+                  });
+          coder = getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements);
+        }
 
         PCollection<TimestampedValue<T>> intermediate = Pipeline.applyTransform(input,
             Create.of(timestampedElements).withCoder(TimestampedValueCoder.of(coder)));
@@ -610,26 +622,6 @@ public class Create<T> {
         c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
       }
     }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder(PBegin input) throws CannotProvideCoderException {
-      if (elementCoder.isPresent()) {
-        return elementCoder.get();
-      } else if (typeDescriptor.isPresent()) {
-        return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
-      } else {
-        Iterable<T> rawElements =
-            Iterables.transform(
-                timestampedElements,
-                new Function<TimestampedValue<T>, T>() {
-                  @Override
-                  public T apply(TimestampedValue<T> input) {
-                    return input.getValue();
-                  }
-                });
-        return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements);
-      }
-    }
   }
 
   private static <T> Coder<T> getDefaultCreateCoder(CoderRegistry registry, Iterable<T> elems)

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index d0314eb..2fd12de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -229,19 +228,18 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
 
   @Override
   public PCollection<T> expand(PCollection<T> input) {
-    return input.apply(ParDo.of(new DoFn<T, T>() {
-      @ProcessElement
-      public void processElement(ProcessContext c) {
-        if (predicate.apply(c.element())) {
-          c.output(c.element());
-        }
-      }
-    }));
-  }
-
-  @Override
-  protected Coder<T> getDefaultOutputCoder(PCollection<T> input) {
-    return input.getCoder();
+    return input
+        .apply(
+            ParDo.of(
+                new DoFn<T, T>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    if (predicate.apply(c.element())) {
+                      c.output(c.element());
+                    }
+                  }
+                }))
+        .setCoder(input.getCoder());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 58051df..f5e7830 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -277,13 +277,16 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
   }
 
   /**
-   * Returns the default {@code Coder} to use for the output of this
-   * single-output {@code PTransform}.
+   * Returns the default {@code Coder} to use for the output of this single-output {@code
+   * PTransform}.
    *
    * <p>By default, always throws
    *
    * @throws CannotProvideCoderException if no coder can be inferred
+   * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the
+   *     returned PCollection.
    */
+  @Deprecated
   protected Coder<?> getDefaultOutputCoder() throws CannotProvideCoderException {
     throw new CannotProvideCoderException("PTransform.getOutputCoder called.");
   }
@@ -295,7 +298,10 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
    * <p>By default, always throws.
    *
    * @throws CannotProvideCoderException if none can be inferred.
+   * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the
+   *     returned PCollection.
    */
+  @Deprecated
   protected Coder<?> getDefaultOutputCoder(@SuppressWarnings("unused") InputT input)
       throws CannotProvideCoderException {
     return getDefaultOutputCoder();
@@ -308,7 +314,10 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
    * <p>By default, always throws.
    *
    * @throws CannotProvideCoderException if none can be inferred.
+   * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the
+   *     returned PCollection.
    */
+  @Deprecated
   public <T> Coder<T> getDefaultOutputCoder(
       InputT input, @SuppressWarnings("unused") PCollection<T> output)
       throws CannotProvideCoderException {

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index bc4f629..a0e1eb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -636,19 +636,21 @@ public class ParDo {
 
     @Override
     public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
-      finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
+      CoderRegistry registry = input.getPipeline().getCoderRegistry();
+      finishSpecifyingStateSpecs(fn, registry, input.getCoder());
       TupleTag<OutputT> mainOutput = new TupleTag<>();
-      return input.apply(withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
-        throws CannotProvideCoderException {
-      return input.getPipeline().getCoderRegistry().getCoder(
-          getFn().getOutputTypeDescriptor(),
-          getFn().getInputTypeDescriptor(),
-          ((PCollection<InputT>) input).getCoder());
+      PCollection<OutputT> res =
+          input.apply(withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput);
+      try {
+        res.setCoder(
+            registry.getCoder(
+                getFn().getOutputTypeDescriptor(),
+                getFn().getInputTypeDescriptor(),
+                ((PCollection<InputT>) input).getCoder()));
+      } catch (CannotProvideCoderException e) {
+        // Ignore and leave coder unset.
+      }
+      return res;
     }
 
     @Override
@@ -757,7 +759,8 @@ public class ParDo {
       validateWindowType(input, fn);
 
       // Use coder registry to determine coders for all StateSpec defined in the fn signature.
-      finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
+      CoderRegistry registry = input.getPipeline().getCoderRegistry();
+      finishSpecifyingStateSpecs(fn, registry, input.getCoder());
 
       DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
       if (signature.usesState() || signature.usesTimers()) {
@@ -771,6 +774,18 @@ public class ParDo {
           Collections.<TupleTag<?>, Coder<?>>emptyMap(),
           input.getWindowingStrategy(),
           input.isBounded());
+      @SuppressWarnings("unchecked")
+      Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
+      for (PCollection<?> out : outputs.getAll().values()) {
+        try {
+          out.setCoder(
+              (Coder)
+                  registry.getCoder(
+                      out.getTypeDescriptor(), getFn().getInputTypeDescriptor(), inputCoder));
+        } catch (CannotProvideCoderException e) {
+          // Ignore and let coder inference happen later.
+        }
+      }
 
       // The fn will likely be an instance of an anonymous subclass
       // such as DoFn<Integer, String> { }, thus will have a high-fidelity
@@ -781,24 +796,6 @@ public class ParDo {
     }
 
     @Override
-    protected Coder<OutputT> getDefaultOutputCoder() {
-      throw new RuntimeException(
-          "internal error: shouldn't be calling this on a multi-output ParDo");
-    }
-
-    @Override
-    public <T> Coder<T> getDefaultOutputCoder(
-        PCollection<? extends InputT> input, PCollection<T> output)
-        throws CannotProvideCoderException {
-      @SuppressWarnings("unchecked")
-      Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
-      return input.getPipeline().getCoderRegistry().getCoder(
-          output.getTypeDescriptor(),
-          getFn().getInputTypeDescriptor(),
-          inputCoder);
-      }
-
-    @Override
     protected String getKindString() {
       return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(getFn()));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index af583e5..2337798 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Ordering;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -453,11 +452,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   }
 
   @Override
-  protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
-    return input.getCoder();
-  }
-
-  @Override
   protected String getKindString() {
     return "Window.Into()";
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index bf06d78..9956d5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -27,9 +27,7 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.EnumSet;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.WriteFiles;
@@ -109,11 +107,6 @@ public class TransformTreeTest {
 
       return PDone.in(input.getPipeline());
     }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 6be6772..81ad947 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -62,7 +62,6 @@ import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -314,28 +313,24 @@ public class CreateTest {
   @Test
   public void testCreateTimestampedDefaultOutputCoderUsingCoder() throws Exception {
     Coder<Record> coder = new RecordCoder();
-    PBegin pBegin = PBegin.in(p);
     Create.TimestampedValues<Record> values =
         Create.timestamped(
             TimestampedValue.of(new Record(), new Instant(0)),
             TimestampedValue.<Record>of(new Record2(), new Instant(0)))
             .withCoder(coder);
-    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
-    assertThat(defaultCoder, equalTo(coder));
+    assertThat(p.apply(values).getCoder(), equalTo(coder));
   }
 
   @Test
   public void testCreateTimestampedDefaultOutputCoderUsingTypeDescriptor() throws Exception {
     Coder<Record> coder = new RecordCoder();
     p.getCoderRegistry().registerCoderForClass(Record.class, coder);
-    PBegin pBegin = PBegin.in(p);
     Create.TimestampedValues<Record> values =
         Create.timestamped(
             TimestampedValue.of(new Record(), new Instant(0)),
             TimestampedValue.<Record>of(new Record2(), new Instant(0)))
             .withType(new TypeDescriptor<Record>() {});
-    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
-    assertThat(defaultCoder, equalTo(coder));
+    assertThat(p.apply(values).getCoder(), equalTo(coder));
   }
 
   @Test
@@ -417,31 +412,25 @@ public class CreateTest {
   public void testCreateDefaultOutputCoderUsingInference() throws Exception {
     Coder<Record> coder = new RecordCoder();
     p.getCoderRegistry().registerCoderForClass(Record.class, coder);
-    PBegin pBegin = PBegin.in(p);
-    Create.Values<Record> values = Create.of(new Record(), new Record(), new Record());
-    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
-    assertThat(defaultCoder, equalTo(coder));
+    assertThat(
+        p.apply(Create.of(new Record(), new Record(), new Record())).getCoder(), equalTo(coder));
   }
 
   @Test
   public void testCreateDefaultOutputCoderUsingCoder() throws Exception {
     Coder<Record> coder = new RecordCoder();
-    PBegin pBegin = PBegin.in(p);
-    Create.Values<Record> values =
-        Create.of(new Record(), new Record2()).withCoder(coder);
-    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
-    assertThat(defaultCoder, equalTo(coder));
+    assertThat(
+        p.apply(Create.of(new Record(), new Record2()).withCoder(coder)).getCoder(),
+        equalTo(coder));
   }
 
   @Test
   public void testCreateDefaultOutputCoderUsingTypeDescriptor() throws Exception {
     Coder<Record> coder = new RecordCoder();
     p.getCoderRegistry().registerCoderForClass(Record.class, coder);
-    PBegin pBegin = PBegin.in(p);
     Create.Values<Record> values =
         Create.of(new Record(), new Record2()).withType(new TypeDescriptor<Record>() {});
-    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
-    assertThat(defaultCoder, equalTo(coder));
+    assertThat(p.apply(values).getCoder(), equalTo(coder));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
index d1b4d07..cb9d984 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
@@ -76,18 +76,14 @@ public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
   @Override
   public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> expand(
       PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
-    return input.apply(
-        ParDo.of(
-            new SortValuesDoFn<PrimaryKeyT, SecondaryKeyT, ValueT>(
-                sorterOptions,
-                getSecondaryKeyCoder(input.getCoder()),
-                getValueCoder(input.getCoder()))));
-  }
-
-  @Override
-  protected Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> getDefaultOutputCoder(
-      PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
-    return input.getCoder();
+    return input
+        .apply(
+            ParDo.of(
+                new SortValuesDoFn<PrimaryKeyT, SecondaryKeyT, ValueT>(
+                    sorterOptions,
+                    getSecondaryKeyCoder(input.getCoder()),
+                    getValueCoder(input.getCoder()))))
+        .setCoder(input.getCoder());
   }
 
   /** Retrieves the {@link Coder} for the secondary key-value pairs. */

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7ca4ce2..6edbd06 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileSystems;
@@ -543,9 +542,7 @@ public class BigQueryIO {
             p.apply("TriggerIdCreation", Create.of(staticJobUuid))
                 .apply("ViewId", View.<String>asSingleton());
         // Apply the traditional Source model.
-        rows =
-            p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid)))
-                .setCoder(getDefaultOutputCoder());
+        rows = p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid)));
       } else {
         // Create a singleton job ID token at execution time.
         jobIdTokenCollection =
@@ -625,7 +622,8 @@ public class BigQueryIO {
                                 }
                               }
                             })
-                        .withSideInputs(schemaView, jobIdTokenView));
+                        .withSideInputs(schemaView, jobIdTokenView))
+                        .setCoder(TableRowJsonCoder.of());
       }
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
           new PassThroughThenCleanup.CleanupOperation() {
@@ -658,11 +656,6 @@ public class BigQueryIO {
     }
 
     @Override
-    protected Coder<TableRow> getDefaultOutputCoder() {
-      return TableRowJsonCoder.of();
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
@@ -1141,11 +1134,6 @@ public class BigQueryIO {
     }
 
     @Override
-    protected Coder<Void> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
index de26c8d..2f7da08 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
@@ -74,7 +74,7 @@ class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T
                     })
                 .withSideInputs(jobIdSideInput, cleanupSignalView));
 
-    return outputs.get(mainOutput);
+    return outputs.get(mainOutput).setCoder(input.getCoder());
   }
 
   private static class IdentityFn<T> extends DoFn<T, T> {

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index ba09cb3..747f2b0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -19,8 +19,6 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
@@ -67,12 +65,6 @@ public class StreamingInserts<DestinationT>
     return new StreamingInserts<>(
         createDisposition, dynamicDestinations, bigQueryServices, retryPolicy);  }
 
-
-  @Override
-  protected Coder<Void> getDefaultOutputCoder() {
-    return VoidCoder.of();
-  }
-
   @Override
   public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
     PCollection<KV<TableDestination, TableRow>> writes =

http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 4f33d61..46c2df4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
@@ -727,7 +726,7 @@ public class PubsubIO {
               getTimestampAttribute(),
               getIdAttribute(),
               getNeedsAttributes());
-      return input.apply(source).apply(MapElements.via(getParseFn()));
+      return input.apply(source).apply(MapElements.via(getParseFn())).setCoder(getCoder());
     }
 
     @Override
@@ -743,11 +742,6 @@ public class PubsubIO {
             .withLabel("Pubsub Subscription"));
       }
     }
-
-    @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return getCoder();
-    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -870,11 +864,6 @@ public class PubsubIO {
           builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
     }
 
-    @Override
-    protected Coder<Void> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
-
     /**
      * Writer to Pubsub which batches messages from bounded collections.
      *


[6/6] beam git commit: This closes #3649: [BEAM-2686] Towards deprecating PCollection.setCoder()

Posted by jk...@apache.org.
This closes #3649: [BEAM-2686] Towards deprecating PCollection.setCoder()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e6530ad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e6530ad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e6530ad

Branch: refs/heads/master
Commit: 9e6530adb00669b7cf0f01cb8b128be0a21fd721
Parents: 38f1890 48690bc
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 17:45:41 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 17:45:41 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/complete/TfIdf.java    |  14 +-
 .../apache/beam/runners/apex/ApexRunner.java    |  10 +-
 .../FlattenPCollectionTranslator.java           |  15 +-
 .../apex/translation/ParDoTranslator.java       |   7 +-
 .../apex/translation/utils/ValuesSource.java    |   2 +-
 .../apex/examples/UnboundedTextSource.java      |   2 +-
 .../translation/ApexGroupByKeyOperatorTest.java |   6 +-
 .../translation/GroupByKeyTranslatorTest.java   |   2 +-
 .../translation/utils/CollectionSource.java     |   2 +-
 .../core/construction/ForwardingPTransform.java |  18 ++-
 .../construction/PCollectionTranslation.java    |   8 +-
 .../construction/PTransformReplacements.java    |   6 +
 .../core/construction/PrimitiveCreate.java      |  14 +-
 .../core/construction/SplittableParDo.java      |  62 ++++----
 .../UnboundedReadFromBoundedSource.java         |  11 +-
 .../construction/ForwardingPTransformTest.java  |  17 ++-
 .../construction/PTransformMatchersTest.java    |  79 +++++-----
 .../core/construction/ReadTranslationTest.java  |   4 +-
 .../construction/ReplacementOutputsTest.java    |  14 +-
 .../core/construction/SplittableParDoTest.java  |  33 +++--
 .../UnboundedReadFromBoundedSourceTest.java     |   2 +-
 .../core/GroupByKeyViaGroupByKeyOnly.java       |  15 +-
 .../core/SplittableParDoViaKeyedWorkItems.java  |  10 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  31 ++--
 .../beam/runners/direct/MultiStepCombine.java   |  10 +-
 .../direct/ParDoMultiOverrideFactory.java       |   5 +-
 .../direct/TestStreamEvaluatorFactory.java      |   8 +-
 .../runners/direct/ViewOverrideFactory.java     |   5 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java |   2 +-
 .../runners/direct/CommittedResultTest.java     |  26 +++-
 .../beam/runners/direct/DirectRunnerTest.java   |   4 +-
 .../runners/direct/EvaluationContextTest.java   |   8 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |   2 +-
 .../runners/flink/CreateStreamingFlinkView.java |   5 +-
 .../streaming/io/UnboundedSocketSource.java     |   2 +-
 .../flink/streaming/TestCountingSource.java     |   2 +-
 .../beam/runners/dataflow/AssignWindows.java    |  12 +-
 .../runners/dataflow/BatchViewOverrides.java    |  19 +--
 .../runners/dataflow/CreateDataflowView.java    |   5 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 147 ++++++++++---------
 .../dataflow/PrimitiveParDoSingleFactory.java   |  12 +-
 .../dataflow/SplittableParDoOverrides.java      |   2 +-
 .../DataflowPipelineTranslatorTest.java         |  21 +--
 .../runners/dataflow/DataflowRunnerTest.java    |  11 +-
 .../transforms/DataflowGroupByKeyTest.java      |  12 +-
 .../dataflow/transforms/DataflowViewTest.java   |  14 +-
 .../beam/runners/spark/io/CreateStream.java     |  11 +-
 .../beam/runners/spark/io/MicrobatchSource.java |   4 +-
 .../runners/spark/io/SparkUnboundedSource.java  |   2 +-
 .../spark/stateful/StateSpecFunctions.java      |   2 +-
 .../translation/StorageLevelPTransform.java     |  10 +-
 .../util/SinglePrimitiveOutputPTransform.java   |  51 -------
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   6 -
 .../java/org/apache/beam/sdk/io/AvroSource.java |   2 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  12 +-
 .../apache/beam/sdk/io/CompressedSource.java    |   6 +-
 .../org/apache/beam/sdk/io/CountingSource.java  |   4 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |  27 ++--
 .../java/org/apache/beam/sdk/io/Source.java     |  14 +-
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  20 +--
 .../java/org/apache/beam/sdk/io/TextIO.java     |  12 --
 .../java/org/apache/beam/sdk/io/TextSource.java |   2 +-
 .../beam/sdk/testing/SourceTestUtils.java       |  12 +-
 .../org/apache/beam/sdk/testing/TestStream.java |   5 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  22 +--
 .../org/apache/beam/sdk/transforms/Create.java  |  72 ++++-----
 .../org/apache/beam/sdk/transforms/Filter.java  |  26 ++--
 .../org/apache/beam/sdk/transforms/Flatten.java |  22 +--
 .../apache/beam/sdk/transforms/GroupByKey.java  |  12 +-
 .../apache/beam/sdk/transforms/PTransform.java  |  13 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  61 ++++----
 .../org/apache/beam/sdk/transforms/View.java    |   5 +-
 .../beam/sdk/transforms/windowing/Window.java   |   8 +-
 .../org/apache/beam/sdk/values/PCollection.java |   9 +-
 .../beam/sdk/values/PCollectionTuple.java       |  10 +-
 .../beam/sdk/io/CompressedSourceTest.java       |   2 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   2 +-
 .../beam/sdk/io/OffsetBasedSourceTest.java      |   2 +-
 .../java/org/apache/beam/sdk/io/ReadTest.java   |   4 +-
 .../sdk/runners/TransformHierarchyTest.java     |  41 +++---
 .../beam/sdk/runners/TransformTreeTest.java     |  19 +--
 .../runners/dataflow/TestCountingSource.java    |   2 +-
 .../apache/beam/sdk/transforms/CreateTest.java  |  31 ++--
 .../apache/beam/sdk/transforms/FlattenTest.java |   2 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |  11 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |  11 +-
 .../beam/sdk/values/PCollectionTupleTest.java   |   7 +-
 .../beam/sdk/extensions/sorter/SortValues.java  |  20 +--
 .../org/apache/beam/sdk/io/amqp/AmqpIO.java     |   2 +-
 .../beam/sdk/io/cassandra/CassandraIO.java      |   2 +-
 .../sdk/io/elasticsearch/ElasticsearchIO.java   |   2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  18 +--
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |   4 +-
 .../io/gcp/bigquery/PassThroughThenCleanup.java |   2 +-
 .../sdk/io/gcp/bigquery/StreamingInserts.java   |   8 -
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |   2 +-
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java |  13 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |   2 +-
 .../hadoop/inputformat/HadoopInputFormatIO.java |   2 +-
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   |   2 +-
 .../apache/beam/sdk/io/hcatalog/HCatalogIO.java |   2 +-
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  |   2 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |   2 +-
 .../beam/sdk/io/kinesis/KinesisSource.java      |   2 +-
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    |   2 +-
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java   |   2 +-
 .../org/apache/beam/sdk/io/mqtt/MqttIO.java     |   2 +-
 .../org/apache/beam/sdk/io/xml/XmlSource.java   |   2 +-
 108 files changed, 631 insertions(+), 778 deletions(-)
----------------------------------------------------------------------



[4/6] beam git commit: Makes all Source classes override getOutputCoder instead of getDefaultOutputCoder

Posted by jk...@apache.org.
Makes all Source classes override getOutputCoder instead of getDefaultOutputCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e017a0ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e017a0ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e017a0ec

Branch: refs/heads/master
Commit: e017a0ec8a16b63828d0955f405b23bc9771bc9e
Parents: 38f1890
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 16:05:27 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700

----------------------------------------------------------------------
 .../runners/apex/translation/utils/ValuesSource.java  |  2 +-
 .../runners/apex/examples/UnboundedTextSource.java    |  2 +-
 .../apex/translation/GroupByKeyTranslatorTest.java    |  2 +-
 .../apex/translation/utils/CollectionSource.java      |  2 +-
 .../construction/UnboundedReadFromBoundedSource.java  |  8 ++++----
 .../core/construction/ReadTranslationTest.java        |  4 ++--
 .../UnboundedReadFromBoundedSourceTest.java           |  2 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java       |  2 +-
 .../apache/beam/runners/direct/DirectRunnerTest.java  |  4 ++--
 .../direct/UnboundedReadEvaluatorFactoryTest.java     |  2 +-
 .../wrappers/streaming/io/UnboundedSocketSource.java  |  2 +-
 .../runners/flink/streaming/TestCountingSource.java   |  2 +-
 .../apache/beam/runners/dataflow/DataflowRunner.java  |  6 +++---
 .../beam/runners/spark/io/MicrobatchSource.java       |  4 ++--
 .../beam/runners/spark/io/SparkUnboundedSource.java   |  2 +-
 .../runners/spark/stateful/StateSpecFunctions.java    |  2 +-
 .../main/java/org/apache/beam/sdk/io/AvroSource.java  |  2 +-
 .../beam/sdk/io/BoundedReadFromUnboundedSource.java   |  6 +++---
 .../java/org/apache/beam/sdk/io/CompressedSource.java |  6 +++---
 .../java/org/apache/beam/sdk/io/CountingSource.java   |  4 ++--
 .../src/main/java/org/apache/beam/sdk/io/Read.java    |  4 ++--
 .../src/main/java/org/apache/beam/sdk/io/Source.java  | 14 ++++++++++----
 .../main/java/org/apache/beam/sdk/io/TFRecordIO.java  |  2 +-
 .../main/java/org/apache/beam/sdk/io/TextSource.java  |  2 +-
 .../org/apache/beam/sdk/testing/SourceTestUtils.java  | 12 ++++++------
 .../java/org/apache/beam/sdk/transforms/Create.java   |  2 +-
 .../org/apache/beam/sdk/io/CompressedSourceTest.java  |  2 +-
 .../org/apache/beam/sdk/io/FileBasedSourceTest.java   |  2 +-
 .../org/apache/beam/sdk/io/OffsetBasedSourceTest.java |  2 +-
 .../test/java/org/apache/beam/sdk/io/ReadTest.java    |  4 ++--
 .../beam/sdk/runners/dataflow/TestCountingSource.java |  2 +-
 .../org/apache/beam/sdk/transforms/CreateTest.java    |  4 ++--
 .../main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java |  2 +-
 .../org/apache/beam/sdk/io/cassandra/CassandraIO.java |  2 +-
 .../beam/sdk/io/elasticsearch/ElasticsearchIO.java    |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java  |  4 ++--
 .../apache/beam/sdk/io/gcp/bigtable/BigtableIO.java   |  2 +-
 .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java |  2 +-
 .../io/hadoop/inputformat/HadoopInputFormatIO.java    |  2 +-
 .../java/org/apache/beam/sdk/io/hbase/HBaseIO.java    |  2 +-
 .../org/apache/beam/sdk/io/hcatalog/HCatalogIO.java   |  2 +-
 .../main/java/org/apache/beam/sdk/io/jms/JmsIO.java   |  2 +-
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java    |  2 +-
 .../org/apache/beam/sdk/io/kinesis/KinesisSource.java |  2 +-
 .../apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java   |  2 +-
 .../org/apache/beam/sdk/io/mongodb/MongoDbIO.java     |  2 +-
 .../main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java |  2 +-
 .../java/org/apache/beam/sdk/io/xml/XmlSource.java    |  2 +-
 48 files changed, 79 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
index 41f027f..4a00ff1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -81,7 +81,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
+  public Coder<T> getOutputCoder() {
     return iterableCoder.getElemCoder();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index c590a2e..8f3e6bc 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -59,7 +59,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource
   }
 
   @Override
-  public Coder<String> getDefaultOutputCoder() {
+  public Coder<String> getOutputCoder() {
     return StringUtf8Coder.of();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 9c61b47..58f33ae 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -153,7 +153,7 @@ public class GroupByKeyTranslatorTest {
     }
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
index 288aade..01a2a85 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
@@ -67,7 +67,7 @@ public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.Chec
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
+  public Coder<T> getOutputCoder() {
     return coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 24eb384..f35f4c3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -92,7 +92,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   protected Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
+    return source.getOutputCoder();
   }
 
   @Override
@@ -166,14 +166,14 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return boundedSource.getDefaultOutputCoder();
+    public Coder<T> getOutputCoder() {
+      return boundedSource.getOutputCoder();
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Override
     public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
-      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+      return new CheckpointCoder<>(boundedSource.getOutputCoder());
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
index 740b324..f85bd79 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
@@ -112,7 +112,7 @@ public class ReadTranslationTest {
     public void validate() {}
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
 
@@ -132,7 +132,7 @@ public class ReadTranslationTest {
     public void validate() {}
 
     @Override
-    public Coder<byte[]> getDefaultOutputCoder() {
+    public Coder<byte[]> getOutputCoder() {
       return ByteArrayCoder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index 0e48a9d..62b06b7 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -320,7 +320,7 @@ public class UnboundedReadFromBoundedSourceTest {
     }
 
     @Override
-    public Coder<Byte> getDefaultOutputCoder() {
+    public Coder<Byte> getOutputCoder() {
       return SerializableCoder.of(Byte.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 6180d29..3d81884 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -395,7 +395,7 @@ public class BoundedReadEvaluatorFactoryTest {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
+    public Coder<T> getOutputCoder() {
       return coder;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 943d27c..d3f407a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -573,8 +573,8 @@ public class DirectRunnerTest implements Serializable {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return underlying.getDefaultOutputCoder();
+    public Coder<T> getOutputCoder() {
+      return underlying.getOutputCoder();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 2a01db5..cc6847d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -477,7 +477,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     public void validate() {}
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
+    public Coder<T> getOutputCoder() {
       return coder;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 910a33f..49e4ddc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -123,7 +123,7 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
   }
 
   @Override
-  public Coder getDefaultOutputCoder() {
+  public Coder<String> getOutputCoder() {
     return DEFAULT_SOCKET_CODER;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index edf548a..fcb9282 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -238,7 +238,7 @@ public class TestCountingSource
   public void validate() {}
 
   @Override
-  public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+  public Coder<KV<Integer, Integer>> getOutputCoder() {
     return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index f8d2c3c..8fce5b4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1176,7 +1176,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
+      return source.getOutputCoder();
     }
 
     @Override
@@ -1212,7 +1212,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @Override
       protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
-        return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder());
+        return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder());
       }
 
       @Override
@@ -1291,7 +1291,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
+      return source.getOutputCoder();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 3b48caf..ae873a3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -140,8 +140,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
+  public Coder<T> getOutputCoder() {
+    return source.getOutputCoder();
   }
 
   public Coder<CheckpointMarkT> getCheckpointMarkCoder() {

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index b31aa9f..26af0c0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -116,7 +116,7 @@ public class SparkUnboundedSource {
     // output the actual (deserialized) stream.
     WindowedValue.FullWindowedValueCoder<T> coder =
         WindowedValue.FullWindowedValueCoder.of(
-            source.getDefaultOutputCoder(),
+            source.getOutputCoder(),
             GlobalWindow.Coder.INSTANCE);
     JavaDStream<WindowedValue<T>> readUnboundedStream =
         mapWithStateDStream

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 1b54478..ca54715 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -161,7 +161,7 @@ public class StateSpecFunctions {
         final List<byte[]> readValues = new ArrayList<>();
         WindowedValue.FullWindowedValueCoder<T> coder =
             WindowedValue.FullWindowedValueCoder.of(
-                source.getDefaultOutputCoder(),
+                source.getOutputCoder(),
                 GlobalWindow.Coder.INSTANCE);
         try {
           // measure how long a read takes per-partition.

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index d277503..8dd3125 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -315,7 +315,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
+  public Coder<T> getOutputCoder() {
     return mode.getOutputCoder();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index c882447..8505ca4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -119,7 +119,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   protected Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
+    return source.getOutputCoder();
   }
 
   @Override
@@ -211,8 +211,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @Override
-    public Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
-      return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getDefaultOutputCoder());
+    public Coder<ValueWithRecordId<T>> getOutputCoder() {
+      return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getOutputCoder());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ad81b61..6943a02 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -404,11 +404,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
   }
 
   /**
-   * Returns the delegate source's default output coder.
+   * Returns the delegate source's output coder.
    */
   @Override
-  public final Coder<T> getDefaultOutputCoder() {
-    return sourceDelegate.getDefaultOutputCoder();
+  public final Coder<T> getOutputCoder() {
+    return sourceDelegate.getOutputCoder();
   }
 
   public final DecompressingChannelFactory getChannelFactory() {

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 6202c2b..b47edc7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -188,7 +188,7 @@ public class CountingSource {
     }
 
     @Override
-    public Coder<Long> getDefaultOutputCoder() {
+    public Coder<Long> getOutputCoder() {
       return VarLongCoder.of();
     }
 
@@ -364,7 +364,7 @@ public class CountingSource {
     public void validate() {}
 
     @Override
-    public Coder<Long> getDefaultOutputCoder() {
+    public Coder<Long> getOutputCoder() {
       return VarLongCoder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index a07fca8..6e6750d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -96,7 +96,7 @@ public class Read {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
+      return source.getOutputCoder();
     }
 
     @Override
@@ -164,7 +164,7 @@ public class Read {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
+      return source.getOutputCoder();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index 542d91c..32a7270 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -61,10 +61,16 @@ public abstract class Source<T> implements Serializable, HasDisplayData {
    */
   public abstract void validate();
 
-  /**
-   * Returns the default {@code Coder} to use for the data read from this source.
-   */
-  public abstract Coder<T> getDefaultOutputCoder();
+  /** @deprecated Override {@link #getOutputCoder()} instead. */
+  @Deprecated
+  public Coder<T> getDefaultOutputCoder() {
+    throw new UnsupportedOperationException("Source needs to override getOutputCoder()");
+  }
+
+  /** Returns the {@code Coder} to use for the data read from this source. */
+  public Coder<T> getOutputCoder() {
+    return getDefaultOutputCoder();
+  }
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 29b3e29..1b2e95b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -474,7 +474,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public Coder<byte[]> getDefaultOutputCoder() {
+    public Coder<byte[]> getOutputCoder() {
       return DEFAULT_BYTE_ARRAY_CODER;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 4d9fa77..86c73d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -69,7 +69,7 @@ class TextSource extends FileBasedSource<String> {
   }
 
   @Override
-  public Coder<String> getDefaultOutputCoder() {
+  public Coder<String> getOutputCoder() {
     return StringUtf8Coder.of();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index cde0b94..e147221 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -212,7 +212,7 @@ public class SourceTestUtils {
       List<? extends BoundedSource<T>> sources,
       PipelineOptions options)
       throws Exception {
-    Coder<T> coder = referenceSource.getDefaultOutputCoder();
+    Coder<T> coder = referenceSource.getOutputCoder();
     List<T> referenceRecords = readFromSource(referenceSource, options);
     List<T> bundleRecords = new ArrayList<>();
     for (BoundedSource<T> source : sources) {
@@ -221,7 +221,7 @@ public class SourceTestUtils {
               + source
               + " is not compatible with Coder type for referenceSource "
               + referenceSource,
-          source.getDefaultOutputCoder(),
+          source.getOutputCoder(),
           equalTo(coder));
       List<T> elems = readFromSource(source, options);
       bundleRecords.addAll(elems);
@@ -239,7 +239,7 @@ public class SourceTestUtils {
    */
   public static <T> void assertUnstartedReaderReadsSameAsItsSource(
       BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
-    Coder<T> coder = reader.getCurrentSource().getDefaultOutputCoder();
+    Coder<T> coder = reader.getCurrentSource().getOutputCoder();
     List<T> expected = readFromUnstartedReader(reader);
     List<T> actual = readFromSource(reader.getCurrentSource(), options);
     List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);
@@ -415,7 +415,7 @@ public class SourceTestUtils {
               source,
               primary,
               residual);
-      Coder<T> coder = primary.getDefaultOutputCoder();
+      Coder<T> coder = primary.getOutputCoder();
       List<ReadableStructuralValue<T>> primaryValues =
           createStructuralValues(coder, primaryItems);
       List<ReadableStructuralValue<T>> currentValues =
@@ -728,8 +728,8 @@ public class SourceTestUtils {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return boundedSource.getDefaultOutputCoder();
+    public Coder<T> getOutputCoder() {
+      return boundedSource.getOutputCoder();
     }
 
     private static class UnsplittableReader<T> extends BoundedReader<T> {

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 09e12ef..a28e9b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -411,7 +411,7 @@ public class Create<T> {
       public void validate() {}
 
       @Override
-      public Coder<T> getDefaultOutputCoder() {
+      public Coder<T> getOutputCoder() {
         return coder;
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index fa28e4b..fe6f01f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -638,7 +638,7 @@ public class CompressedSourceTest {
     }
 
     @Override
-    public Coder<Byte> getDefaultOutputCoder() {
+    public Coder<Byte> getOutputCoder() {
       return SerializableCoder.of(Byte.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index c15e667..1bdb915 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -107,7 +107,7 @@ public class FileBasedSourceTest {
     public void validate() {}
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 25168a3..feda355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -65,7 +65,7 @@ public class OffsetBasedSourceTest {
     public void validate() {}
 
     @Override
-    public Coder<Integer> getDefaultOutputCoder() {
+    public Coder<Integer> getOutputCoder() {
       return BigEndianIntegerCoder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 74acf18..4277dc3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -171,7 +171,7 @@ public class ReadTest implements Serializable{
     public void validate() {}
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
   }
@@ -207,7 +207,7 @@ public class ReadTest implements Serializable{
     public void validate() {}
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
index 9fcc3c5..338ea38 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
@@ -248,7 +248,7 @@ public class TestCountingSource
   public void validate() {}
 
   @Override
-  public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+  public Coder<KV<Integer, Integer>> getOutputCoder() {
     return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 6a682ef..6be6772 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -487,12 +487,12 @@ public class CreateTest {
   }
 
   @Test
-  public void testSourceGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
+  public void testSourceGetOutputCoderReturnsConstructorCoder() throws Exception {
     Coder<Integer> coder = VarIntCoder.of();
     CreateSource<Integer> source =
         CreateSource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
 
-    Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
+    Coder<Integer> defaultCoder = source.getOutputCoder();
     assertThat(defaultCoder, equalTo(coder));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
index 1f307b2..508373f 100644
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
@@ -246,7 +246,7 @@ public class AmqpIO {
     }
 
     @Override
-    public Coder<Message> getDefaultOutputCoder() {
+    public Coder<Message> getOutputCoder() {
       return new AmqpMessageCoder();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 32905b7..eacc3e4 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -289,7 +289,7 @@ public class CassandraIO {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
+    public Coder<T> getOutputCoder() {
       return spec.coder();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index acc7f2f..5046888 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -484,7 +484,7 @@ public class ElasticsearchIO {
     }
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 6c118a0..abe559c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -133,7 +133,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
   }
 
   @Override
-  public Coder<TableRow> getDefaultOutputCoder() {
+  public Coder<TableRow> getOutputCoder() {
     return TableRowJsonCoder.of();
   }
 
@@ -184,7 +184,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
     List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
     for (ResourceId file : files) {
       avroSources.add(
-          AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder()));
+          AvroSource.from(file.toString()).withParseFn(function, getOutputCoder()));
     }
     return ImmutableList.copyOf(avroSources);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 0a90dde..c5b0fbf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -893,7 +893,7 @@ public class BigtableIO {
     }
 
     @Override
-    public Coder<Row> getDefaultOutputCoder() {
+    public Coder<Row> getOutputCoder() {
       return ProtoCoder.of(Row.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b7df804..8da6ff4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1164,7 +1164,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     }
 
     @Override
-    public Coder<PubsubMessage> getDefaultOutputCoder() {
+    public Coder<PubsubMessage> getOutputCoder() {
       return outer.getNeedsAttributes()
           ? PubsubMessageWithAttributesCoder.of()
           : PubsubMessagePayloadOnlyCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 0b4c23f..20ca50a 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -552,7 +552,7 @@ public class HadoopInputFormatIO {
     }
 
     @Override
-    public Coder<KV<K, V>> getDefaultOutputCoder() {
+    public Coder<KV<K, V>> getOutputCoder() {
       return KvCoder.of(keyCoder, valueCoder);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 90ede4c..2ba6826 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -457,7 +457,7 @@ public class HBaseIO {
         }
 
         @Override
-        public Coder<Result> getDefaultOutputCoder() {
+        public Coder<Result> getOutputCoder() {
             return HBaseResultCoder.of();
         }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index 4199b80..d8e462b 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -210,7 +210,7 @@ public class HCatalogIO {
 
     @Override
     @SuppressWarnings({"unchecked", "rawtypes"})
-    public Coder<HCatRecord> getDefaultOutputCoder() {
+    public Coder<HCatRecord> getOutputCoder() {
       return (Coder) WritableCoder.of(DefaultHCatRecord.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index f8cba5e..2af0ce9 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -373,7 +373,7 @@ public class JmsIO {
     }
 
     @Override
-    public Coder<JmsRecord> getDefaultOutputCoder() {
+    public Coder<JmsRecord> getOutputCoder() {
       return SerializableCoder.of(JmsRecord.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 026313a..7fb4260 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -844,7 +844,7 @@ public class KafkaIO {
     }
 
     @Override
-    public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
+    public Coder<KafkaRecord<K, V>> getOutputCoder() {
       return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 362792b..144bd80 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -107,7 +107,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
   }
 
   @Override
-  public Coder<KinesisRecord> getDefaultOutputCoder() {
+  public Coder<KinesisRecord> getOutputCoder() {
     return KinesisRecordCoder.of();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 5b5412c..c612d52 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -440,7 +440,7 @@ public class MongoDbGridFSIO {
       }
 
       @Override
-      public Coder<ObjectId> getDefaultOutputCoder() {
+      public Coder<ObjectId> getOutputCoder() {
         return SerializableCoder.of(ObjectId.class);
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 3b14182..087123a 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -277,7 +277,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public Coder<Document> getDefaultOutputCoder() {
+    public Coder<Document> getOutputCoder() {
       return SerializableCoder.of(Document.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index add5cb5..5aadb80 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -387,7 +387,7 @@ public class MqttIO {
     }
 
     @Override
-    public Coder<byte[]> getDefaultOutputCoder() {
+    public Coder<byte[]> getOutputCoder() {
       return ByteArrayCoder.of();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
index 7aa42c5..b893d43 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
@@ -85,7 +85,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
+  public Coder<T> getOutputCoder() {
     return JAXBCoder.of(spec.getRecordClass());
   }
 


[5/6] beam git commit: Remembers the output coders of SplittableParDo

Posted by jk...@apache.org.
Remembers the output coders of SplittableParDo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48690bc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48690bc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48690bc6

Branch: refs/heads/master
Commit: 48690bc61673e767d4a1fa72e0499c32f160db39
Parents: 95e2a00
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 17:35:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 17:35:03 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  5 +-
 .../core/construction/SplittableParDo.java      | 65 ++++++++++----------
 .../core/construction/SplittableParDoTest.java  | 33 +++++-----
 .../core/SplittableParDoViaKeyedWorkItems.java  |  1 +
 .../direct/ParDoMultiOverrideFactory.java       |  2 +-
 .../dataflow/SplittableParDoOverrides.java      |  2 +-
 6 files changed, 56 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index cee524e..57d2593 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -379,8 +379,9 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
         AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
           transform) {
-      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
-          SplittableParDo.forJavaParDo(transform.getTransform()));
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          SplittableParDo.forAppliedParDo(transform));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index bcc5de8..32d3409 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -74,6 +74,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   private final List<PCollectionView<?>> sideInputs;
   private final TupleTag<OutputT> mainOutputTag;
   private final TupleTagList additionalOutputTags;
+  private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
 
   public static final String SPLITTABLE_PROCESS_URN =
       "urn:beam:runners_core:transforms:splittable_process:v1";
@@ -86,34 +87,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
   private SplittableParDo(
       DoFn<InputT, OutputT> doFn,
-      TupleTag<OutputT> mainOutputTag,
       List<PCollectionView<?>> sideInputs,
-      TupleTagList additionalOutputTags) {
+      TupleTag<OutputT> mainOutputTag,
+      TupleTagList additionalOutputTags,
+      Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
     checkArgument(
         DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(),
         "fn must be a splittable DoFn");
     this.doFn = doFn;
-    this.mainOutputTag = mainOutputTag;
     this.sideInputs = sideInputs;
+    this.mainOutputTag = mainOutputTag;
     this.additionalOutputTags = additionalOutputTags;
-  }
-
-  /**
-   * Creates a {@link SplittableParDo} from an original Java {@link ParDo}.
-   *
-   * @param parDo The splittable {@link ParDo} transform.
-   */
-  public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo(
-      ParDo.MultiOutput<InputT, OutputT> parDo) {
-    checkArgument(parDo != null, "parDo must not be null");
-    checkArgument(
-        DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
-        "fn must be a splittable DoFn");
-    return new SplittableParDo(
-        parDo.getFn(),
-        parDo.getMainOutputTag(),
-        parDo.getSideInputs(),
-        parDo.getAdditionalOutputTags());
+    this.outputTagsToCoders = outputTagsToCoders;
   }
 
   /**
@@ -122,15 +107,22 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
    * <p>The input may generally be a deserialized transform so it may not actually be a {@link
    * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
    */
-  public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, ?, ?> parDo) {
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forAppliedParDo(
+      AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> parDo) {
     checkArgument(parDo != null, "parDo must not be null");
 
     try {
-      return new SplittableParDo<>(
+      Map<TupleTag<?>, Coder<?>> outputTagsToCoders = Maps.newHashMap();
+      for (Map.Entry<TupleTag<?>, PValue> entry : parDo.getOutputs().entrySet()) {
+        outputTagsToCoders.put(entry.getKey(), ((PCollection) entry.getValue()).getCoder());
+      }
+      return new SplittableParDo(
           ParDoTranslation.getDoFn(parDo),
-          (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
           ParDoTranslation.getSideInputs(parDo),
-          ParDoTranslation.getAdditionalOutputTags(parDo));
+          ParDoTranslation.getMainOutputTag(parDo),
+          ParDoTranslation.getAdditionalOutputTags(parDo),
+          outputTagsToCoders);
     } catch (IOException exc) {
       throw new RuntimeException(exc);
     }
@@ -169,7 +161,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
             sideInputs,
             mainOutputTag,
-            additionalOutputTags));
+            additionalOutputTags,
+            outputTagsToCoders));
   }
 
   @Override
@@ -203,6 +196,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     private final List<PCollectionView<?>> sideInputs;
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList additionalOutputTags;
+    private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
 
     /**
      * @param fn the splittable {@link DoFn}.
@@ -210,7 +204,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
      * @param sideInputs list of side inputs that should be available to the {@link DoFn}.
      * @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
      * @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional
-     *     outputs.
+     * @param outputTagsToCoders A map from output tag to the coder for that output, which should
+     *     provide mappings for the main and all additional tags.
      */
     public ProcessKeyedElements(
         DoFn<InputT, OutputT> fn,
@@ -219,7 +214,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         WindowingStrategy<InputT, ?> windowingStrategy,
         List<PCollectionView<?>> sideInputs,
         TupleTag<OutputT> mainOutputTag,
-        TupleTagList additionalOutputTags) {
+        TupleTagList additionalOutputTags,
+        Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
       this.fn = fn;
       this.elementCoder = elementCoder;
       this.restrictionCoder = restrictionCoder;
@@ -227,6 +223,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
       this.additionalOutputTags = additionalOutputTags;
+      this.outputTagsToCoders = outputTagsToCoders;
     }
 
     public DoFn<InputT, OutputT> getFn() {
@@ -257,10 +254,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       return additionalOutputTags;
     }
 
+    public Map<TupleTag<?>, Coder<?>> getOutputTagsToCoders() {
+      return outputTagsToCoders;
+    }
+
     @Override
     public PCollectionTuple expand(PCollection<KV<String, KV<InputT, RestrictionT>>> input) {
       return createPrimitiveOutputFor(
-          input, fn, mainOutputTag, additionalOutputTags, windowingStrategy);
+          input, fn, mainOutputTag, additionalOutputTags, outputTagsToCoders, windowingStrategy);
     }
 
     public static <OutputT> PCollectionTuple createPrimitiveOutputFor(
@@ -268,14 +269,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         DoFn<?, OutputT> fn,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList additionalOutputTags,
+        Map<TupleTag<?>, Coder<?>> outputTagsToCoders,
         WindowingStrategy<?, ?> windowingStrategy) {
       DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
       PCollectionTuple outputs =
           PCollectionTuple.ofPrimitiveOutputsInternal(
               input.getPipeline(),
               TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
-              // TODO
-              Collections.<TupleTag<?>, Coder<?>>emptyMap(),
+              outputTagsToCoders,
               windowingStrategy,
               input.isBounded().and(signature.isBoundedPerElement()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 267232c..05c471d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -29,6 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.junit.Rule;
@@ -106,12 +108,18 @@ public class SplittableParDoTest {
 
   private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() {};
 
-  private ParDo.MultiOutput<Integer, String> makeParDo(DoFn<Integer, String> fn) {
-    return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+  private PCollection<String> applySplittableParDo(
+      String name, PCollection<Integer> input, DoFn<Integer, String> fn) {
+    ParDo.MultiOutput<Integer, String> multiOutput =
+        ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+    PCollectionTuple output = multiOutput.expand(input);
+    output.get(MAIN_OUTPUT_TAG).setName("main");
+    AppliedPTransform<PCollection<Integer>, PCollectionTuple, ?> transform =
+        AppliedPTransform.of("ParDo", input.expand(), output.expand(), multiOutput, pipeline);
+    return input.apply(name, SplittableParDo.forAppliedParDo(transform)).get(MAIN_OUTPUT_TAG);
   }
 
-  @Rule
-  public TestPipeline pipeline = TestPipeline.create();
+  @Rule public TestPipeline pipeline = TestPipeline.create();
 
   @Test
   public void testBoundednessForBoundedFn() {
@@ -121,16 +129,12 @@ public class SplittableParDoTest {
     assertEquals(
         "Applying a bounded SDF to a bounded collection produces a bounded collection",
         PCollection.IsBounded.BOUNDED,
-        makeBoundedCollection(pipeline)
-            .apply("bounded to bounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
-            .get(MAIN_OUTPUT_TAG)
+        applySplittableParDo("bounded to bounded", makeBoundedCollection(pipeline), boundedFn)
             .isBounded());
     assertEquals(
         "Applying a bounded SDF to an unbounded collection produces an unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
-        makeUnboundedCollection(pipeline)
-            .apply("bounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
-            .get(MAIN_OUTPUT_TAG)
+        applySplittableParDo("bounded to unbounded", makeUnboundedCollection(pipeline), boundedFn)
             .isBounded());
   }
 
@@ -142,16 +146,13 @@ public class SplittableParDoTest {
     assertEquals(
         "Applying an unbounded SDF to a bounded collection produces a bounded collection",
         PCollection.IsBounded.UNBOUNDED,
-        makeBoundedCollection(pipeline)
-            .apply("unbounded to bounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
-            .get(MAIN_OUTPUT_TAG)
+        applySplittableParDo("unbounded to bounded", makeBoundedCollection(pipeline), unboundedFn)
             .isBounded());
     assertEquals(
         "Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
-        makeUnboundedCollection(pipeline)
-            .apply("unbounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
-            .get(MAIN_OUTPUT_TAG)
+        applySplittableParDo(
+                "unbounded to unbounded", makeUnboundedCollection(pipeline), unboundedFn)
             .isBounded());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index af720fd..251260e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -184,6 +184,7 @@ public class SplittableParDoViaKeyedWorkItems {
           original.getFn(),
           original.getMainOutputTag(),
           original.getAdditionalOutputTags(),
+          original.getOutputTagsToCoders(),
           original.getInputWindowingStrategy());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 3f04b56..26f30b0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -96,7 +96,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
 
     if (signature.processElement().isSplittable()) {
-      return (PTransform) SplittableParDo.forAppliedParDo(application);
+      return SplittableParDo.forAppliedParDo((AppliedPTransform) application);
     } else if (signature.stateDeclarations().size() > 0
         || signature.timerDeclarations().size() > 0) {
       return new GbkThenStatefulParDo(

http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
index fc010f8..7b65950 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
@@ -64,7 +64,7 @@ class SplittableParDoOverrides {
             appliedTransform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(appliedTransform),
-          SplittableParDo.forJavaParDo(appliedTransform.getTransform()));
+          SplittableParDo.forAppliedParDo(appliedTransform));
     }
 
     @Override


[2/6] beam git commit: Requires specifying a Coder on PCollection.createPrimitiveOutputInternal

Posted by jk...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 4063d11..e8bf9b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -366,10 +366,15 @@ public class PCollection<T> extends PValueBase implements PValue {
   public static <T> PCollection<T> createPrimitiveOutputInternal(
       Pipeline pipeline,
       WindowingStrategy<?, ?> windowingStrategy,
-      IsBounded isBounded) {
-    return new PCollection<T>(pipeline)
+      IsBounded isBounded,
+      @Nullable Coder<T> coder) {
+    PCollection<T> res = new PCollection<T>(pipeline)
         .setWindowingStrategyInternal(windowingStrategy)
         .setIsBoundedInternal(isBounded);
+    if (coder != null) {
+      res.setCoder(coder);
+    }
+    return res;
   }
 
   private static class CoderOrFailure<T> {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index 793994f..9799d0e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -201,6 +202,7 @@ public class PCollectionTuple implements PInput, POutput {
   public static PCollectionTuple ofPrimitiveOutputsInternal(
       Pipeline pipeline,
       TupleTagList outputTags,
+      Map<TupleTag<?>, Coder<?>> coders,
       WindowingStrategy<?, ?> windowingStrategy,
       IsBounded isBounded) {
     Map<TupleTag<?>, PCollection<?>> pcollectionMap = new LinkedHashMap<>();
@@ -217,10 +219,10 @@ public class PCollectionTuple implements PInput, POutput {
       // erasure as the correct type. When a transform adds
       // elements to `outputCollection` they will be of type T.
       @SuppressWarnings("unchecked")
-      TypeDescriptor<Object> token = (TypeDescriptor<Object>) outputTag.getTypeDescriptor();
-      PCollection<Object> outputCollection = PCollection
-          .createPrimitiveOutputInternal(pipeline, windowingStrategy, isBounded)
-          .setTypeDescriptor(token);
+      PCollection outputCollection =
+          PCollection.createPrimitiveOutputInternal(
+                  pipeline, windowingStrategy, isBounded, coders.get(outputTag))
+              .setTypeDescriptor((TypeDescriptor) outputTag.getTypeDescriptor());
 
       pcollectionMap.put(outputTag, outputCollection);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 93650dd..12fe633 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
@@ -110,7 +111,7 @@ public class TransformHierarchyTest implements Serializable {
   public void emptyCompositeSucceeds() {
     PCollection<Long> created =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
     TransformHierarchy.Node node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
     hierarchy.setOutput(created);
     hierarchy.popNode();
@@ -139,7 +140,7 @@ public class TransformHierarchyTest implements Serializable {
   public void producingOwnAndOthersOutputsFails() {
     PCollection<Long> created =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
     hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
     hierarchy.setOutput(created);
     hierarchy.popNode();
@@ -147,8 +148,11 @@ public class TransformHierarchyTest implements Serializable {
 
     final PCollectionList<Long> appended =
         pcList.and(
-            PCollection.<Long>createPrimitiveOutputInternal(
-                    pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+            PCollection.createPrimitiveOutputInternal(
+                    pipeline,
+                    WindowingStrategy.globalDefault(),
+                    IsBounded.BOUNDED,
+                    VarLongCoder.of())
                 .setName("prim"));
     hierarchy.pushNode(
         "AddPc",
@@ -171,7 +175,7 @@ public class TransformHierarchyTest implements Serializable {
   public void producingOwnOutputWithCompositeFails() {
     final PCollection<Long> comp =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
     PTransform<PBegin, PCollection<Long>> root =
         new PTransform<PBegin, PCollection<Long>>() {
           @Override
@@ -327,7 +331,7 @@ public class TransformHierarchyTest implements Serializable {
 
     PCollection<Long> created =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
 
     SingleOutput<Long, Long> pardo =
         ParDo.of(
@@ -340,7 +344,7 @@ public class TransformHierarchyTest implements Serializable {
 
     PCollection<Long> mapped =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
 
     TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
     hierarchy.finishSpecifyingInput();
@@ -499,13 +503,11 @@ public class TransformHierarchyTest implements Serializable {
   @Test
   public void visitIsTopologicallyOrdered() {
     PCollection<String> one =
-        PCollection.<String>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-            .setCoder(StringUtf8Coder.of());
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
     final PCollection<Integer> two =
-        PCollection.<Integer>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(VarIntCoder.of());
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of());
     final PDone done = PDone.in(pipeline);
     final TupleTag<String> oneTag = new TupleTag<String>() {};
     final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
@@ -617,13 +619,14 @@ public class TransformHierarchyTest implements Serializable {
   @Test
   public void visitDoesNotVisitSkippedNodes() {
     PCollection<String> one =
-        PCollection.<String>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-            .setCoder(StringUtf8Coder.of());
+        PCollection.createPrimitiveOutputInternal(
+                pipeline,
+                WindowingStrategy.globalDefault(),
+                IsBounded.BOUNDED,
+                StringUtf8Coder.of());
     final PCollection<Integer> two =
-        PCollection.<Integer>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(VarIntCoder.of());
+        PCollection.createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of());
     final PDone done = PDone.in(pipeline);
     final TupleTag<String> oneTag = new TupleTag<String>() {};
     final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index e7b680a..bf06d78 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
@@ -85,10 +86,13 @@ public class TransformTreeTest {
       // Issue below: PCollection.createPrimitiveOutput should not be used
       // from within a composite transform.
       return PCollectionList.of(
-          Arrays.asList(result, PCollection.<String>createPrimitiveOutputInternal(
-              b.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              result.isBounded())));
+          Arrays.asList(
+              result,
+              PCollection.createPrimitiveOutputInternal(
+                  b.getPipeline(),
+                  WindowingStrategy.globalDefault(),
+                  result.isBounded(),
+                  StringUtf8Coder.of())));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index a8cb843..5dbe176 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -228,7 +228,7 @@ public class FlattenTest implements Serializable {
   public void testFlattenNoListsNoCoder() {
     // not ValidatesRunner because it should fail at pipeline construction time anyhow.
     thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("cannot provide a Coder for empty");
+    thrown.expectMessage("Unable to return a default Coder");
 
     PCollectionList.<ClassWithoutCoder>empty(p)
         .apply(Flatten.<ClassWithoutCoder>pCollections());

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 8fcb4c0..a76714c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matcher;
 import org.joda.time.Duration;
@@ -423,11 +422,11 @@ public class GroupByKeyTest implements Serializable {
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
               public PCollection<KV<String, Integer>> expand(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+                return PCollection.createPrimitiveOutputInternal(
+                    input.getPipeline(),
+                    WindowingStrategy.globalDefault(),
+                    PCollection.IsBounded.UNBOUNDED,
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
               }
             });
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index cdd03d9..bfb8b5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
@@ -1340,11 +1339,11 @@ public class ViewTest implements Serializable {
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
               public PCollection<KV<String, Integer>> expand(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+                return PCollection.createPrimitiveOutputInternal(
+                    input.getPipeline(),
+                    WindowingStrategy.globalDefault(),
+                    PCollection.IsBounded.UNBOUNDED,
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
               }
             })
         .apply(view);

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 58e2bbd..33503b6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -59,9 +60,9 @@ public final class PCollectionTupleTest implements Serializable {
   @Test
   public void testOfThenHas() {
 
-    PCollection<Object> pCollection = PCollection.createPrimitiveOutputInternal(
-        pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-    TupleTag<Object> tag = new TupleTag<>();
+    PCollection<Integer> pCollection = PCollection.createPrimitiveOutputInternal(
+        pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
+    TupleTag<Integer> tag = new TupleTag<>();
 
     assertTrue(PCollectionTuple.of(tag, pCollection).has(tag));
   }


[3/6] beam git commit: Requires specifying a Coder on PCollection.createPrimitiveOutputInternal

Posted by jk...@apache.org.
Requires specifying a Coder on PCollection.createPrimitiveOutputInternal

The coder can still be null, in which case it is left unspecified.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bb1bf3c1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bb1bf3c1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bb1bf3c1

Branch: refs/heads/master
Commit: bb1bf3c19ca0baa2c04cec9863bfcaca2024f94e
Parents: e017a0e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 17:14:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |   5 +-
 .../FlattenPCollectionTranslator.java           |  15 +-
 .../apex/translation/ParDoTranslator.java       |   7 +-
 .../translation/ApexGroupByKeyOperatorTest.java |   6 +-
 .../construction/PCollectionTranslation.java    |   8 +-
 .../construction/PTransformReplacements.java    |   6 +
 .../core/construction/PrimitiveCreate.java      |  14 +-
 .../core/construction/SplittableParDo.java      |   3 +
 .../UnboundedReadFromBoundedSource.java         |   5 -
 .../construction/PTransformMatchersTest.java    |  79 +++++-----
 .../construction/ReplacementOutputsTest.java    |  14 +-
 .../core/GroupByKeyViaGroupByKeyOnly.java       |  15 +-
 .../core/SplittableParDoViaKeyedWorkItems.java  |   9 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  31 ++--
 .../direct/ParDoMultiOverrideFactory.java       |   3 +
 .../direct/TestStreamEvaluatorFactory.java      |   8 +-
 .../runners/direct/ViewOverrideFactory.java     |   5 +-
 .../runners/direct/CommittedResultTest.java     |  26 +++-
 .../runners/direct/EvaluationContextTest.java   |   8 +-
 .../runners/flink/CreateStreamingFlinkView.java |   5 +-
 .../beam/runners/dataflow/AssignWindows.java    |   4 +-
 .../runners/dataflow/BatchViewOverrides.java    |  19 +--
 .../runners/dataflow/CreateDataflowView.java    |   5 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 147 ++++++++++---------
 .../dataflow/PrimitiveParDoSingleFactory.java   |  12 +-
 .../DataflowPipelineTranslatorTest.java         |  11 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  11 +-
 .../transforms/DataflowGroupByKeyTest.java      |  12 +-
 .../dataflow/transforms/DataflowViewTest.java   |  14 +-
 .../beam/runners/spark/io/CreateStream.java     |  11 +-
 .../translation/StorageLevelPTransform.java     |  10 +-
 .../util/SinglePrimitiveOutputPTransform.java   |  51 -------
 .../main/java/org/apache/beam/sdk/io/Read.java  |  21 ++-
 .../org/apache/beam/sdk/testing/TestStream.java |   5 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  22 +--
 .../apache/beam/sdk/transforms/GroupByKey.java  |  12 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   2 +
 .../org/apache/beam/sdk/transforms/View.java    |   5 +-
 .../beam/sdk/transforms/windowing/Window.java   |   2 +-
 .../org/apache/beam/sdk/values/PCollection.java |   9 +-
 .../beam/sdk/values/PCollectionTuple.java       |  10 +-
 .../sdk/runners/TransformHierarchyTest.java     |  41 +++---
 .../beam/sdk/runners/TransformTreeTest.java     |  12 +-
 .../apache/beam/sdk/transforms/FlattenTest.java |   2 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |  11 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |  11 +-
 .../beam/sdk/values/PCollectionTupleTest.java   |   7 +-
 47 files changed, 357 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index fd0a1c9..cee524e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -227,9 +227,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
     @Override
     public PCollection<ElemT> expand(PCollection<ElemT> input) {
-      return PCollection.<ElemT>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
     }
 
     public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 440b801..189cb65 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -110,8 +110,12 @@ class FlattenPCollectionTranslator<T> implements
           }
 
           if (collections.size() > 2) {
-            PCollection<T> intermediateCollection = intermediateCollection(collection,
-                collection.getCoder());
+            PCollection<T> intermediateCollection =
+                PCollection.createPrimitiveOutputInternal(
+                    collection.getPipeline(),
+                    collection.getWindowingStrategy(),
+                    collection.isBounded(),
+                    collection.getCoder());
             context.addOperator(operator, operator.out, intermediateCollection);
             remainingCollections.add(intermediateCollection);
           } else {
@@ -135,11 +139,4 @@ class FlattenPCollectionTranslator<T> implements
     }
   }
 
-  static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
-    PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-        input.getWindowingStrategy(), input.isBounded());
-    output.setCoder(outputCoder);
-    return output;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index e46687a..be11b02 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -241,8 +241,11 @@ class ParDoTranslator<InputT, OutputT>
     }
 
     PCollection<Object> resultCollection =
-        FlattenPCollectionTranslator.intermediateCollection(
-            firstSideInput, firstSideInput.getCoder());
+        PCollection.createPrimitiveOutputInternal(
+            firstSideInput.getPipeline(),
+            firstSideInput.getWindowingStrategy(),
+            firstSideInput.isBounded(),
+            firstSideInput.getCoder());
     FlattenPCollectionTranslator.flattenCollections(
         sourceCollections, unionTags, resultCollection, context);
     return resultCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index 206b430..63a218b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -59,9 +59,9 @@ public class ApexGroupByKeyOperatorTest {
 
     WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of(
         Duration.standardSeconds(10)));
-    PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline,
-        ws, IsBounded.BOUNDED);
-    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+    PCollection<KV<String, Integer>> input =
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, ws, IsBounded.BOUNDED, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
 
     ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
         input, new ApexStateInternals.ApexStateBackend()

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index c0a5acf..c256e4c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -52,10 +52,10 @@ public class PCollectionTranslation {
 
     Coder<?> coder = components.getCoder(pCollection.getCoderId());
     return PCollection.createPrimitiveOutputInternal(
-            pipeline,
-            components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
-            fromProto(pCollection.getIsBounded()))
-        .setCoder((Coder) coder);
+        pipeline,
+        components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
+        fromProto(pCollection.getIsBounded()),
+        (Coder) coder);
   }
 
   public static IsBounded isBounded(RunnerApi.PCollection pCollection) {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
index 706a956..35bad15 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.Iterables;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -66,4 +67,9 @@ public class PTransformReplacements {
         ignoredTags);
     return mainInput;
   }
+
+  public static <T> PCollection<T> getSingletonMainOutput(
+      AppliedPTransform<?, PCollection<T>, ? extends PTransform<?, PCollection<T>>> transform) {
+    return ((PCollection<T>) Iterables.getOnlyElement(transform.getOutputs().values()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
index f43d23b..62b6d0a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
@@ -18,7 +18,9 @@
 
 package org.apache.beam.runners.core.construction;
 
+import com.google.common.collect.Iterables;
 import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -36,15 +38,17 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  */
 public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
   private final Create.Values<T> transform;
+  private final Coder<T> coder;
 
-  private PrimitiveCreate(Create.Values<T> transform) {
+  private PrimitiveCreate(Create.Values<T> transform, Coder<T> coder) {
     this.transform = transform;
+    this.coder = coder;
   }
 
   @Override
   public PCollection<T> expand(PBegin input) {
     return PCollection.createPrimitiveOutputInternal(
-        input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+        input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED, coder);
   }
 
   public Iterable<T> getElements() {
@@ -60,7 +64,11 @@ public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
     public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
         AppliedPTransform<PBegin, PCollection<T>, Values<T>> transform) {
       return PTransformReplacement.of(
-          transform.getPipeline().begin(), new PrimitiveCreate<T>(transform.getTransform()));
+          transform.getPipeline().begin(),
+          new PrimitiveCreate<T>(
+              transform.getTransform(),
+              ((PCollection<T>) Iterables.getOnlyElement(transform.getOutputs().values()))
+                  .getCoder()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index e71187b..bcc5de8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -273,6 +274,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
           PCollectionTuple.ofPrimitiveOutputsInternal(
               input.getPipeline(),
               TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
+              // TODO
+              Collections.<TupleTag<?>, Coder<?>>emptyMap(),
               windowingStrategy,
               input.isBounded().and(signature.isBoundedPerElement()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index f35f4c3..55f9519 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -91,11 +91,6 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
   }
 
   @Override
-  protected Coder<T> getDefaultOutputCoder() {
-    return source.getOutputCoder();
-  }
-
-  @Override
   public String getKindString() {
     return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 1862699..fa7e1e9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -100,15 +100,16 @@ public class PTransformMatchersTest implements Serializable {
   private AppliedPTransform<?, ?, ?> getAppliedTransform(PTransform pardo) {
     PCollection<KV<String, Integer>> input =
         PCollection.createPrimitiveOutputInternal(
-            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            p,
+            WindowingStrategy.globalDefault(),
+            IsBounded.BOUNDED,
+            KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
     input.setName("dummy input");
-    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
 
     PCollection<Integer> output =
         PCollection.createPrimitiveOutputInternal(
-            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
     output.setName("dummy output");
-    output.setCoder(VarIntCoder.of());
 
     return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
   }
@@ -133,7 +134,7 @@ public class PTransformMatchersTest implements Serializable {
       @Override
       public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
         return PCollection.createPrimitiveOutputInternal(
-            input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+            input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), VarIntCoder.of());
       }
     }
     PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
@@ -425,14 +426,14 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+            .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
                 "EmptyFlatten",
                 Collections.<TupleTag<?>, PValue>emptyMap(),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.pCollections(),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                Flatten.<Integer>pCollections(),
                 p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
@@ -442,17 +443,17 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+            .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
                 "Flatten",
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.pCollections(),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                Flatten.<Integer>pCollections(),
                 p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
@@ -462,15 +463,15 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+            .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
                 "EmptyFlatten",
                 Collections.<TupleTag<?>, PValue>emptyMap(),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.iterables() /* This isn't actually possible to construct,
-                                 * but for the sake of example */,
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                /* This isn't actually possible to construct, but for the sake of example */
+                Flatten.<Integer>iterables(),
                 p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
@@ -480,17 +481,17 @@ public class PTransformMatchersTest implements Serializable {
   public void flattenWithDuplicateInputsWithoutDuplicates() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+            .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
                 "Flatten",
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.pCollections(),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                Flatten.<Integer>pCollections(),
                 p);
 
     assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
@@ -498,22 +499,22 @@ public class PTransformMatchersTest implements Serializable {
 
   @Test
   public void flattenWithDuplicateInputsWithDuplicates() {
-    PCollection<Object> duplicate =
+    PCollection<Integer> duplicate =
         PCollection.createPrimitiveOutputInternal(
-            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+            .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
                 "Flatten",
                 ImmutableMap.<TupleTag<?>, PValue>builder()
-                    .put(new TupleTag<Object>(), duplicate)
-                    .put(new TupleTag<Object>(), duplicate)
+                    .put(new TupleTag<Integer>(), duplicate)
+                    .put(new TupleTag<Integer>(), duplicate)
                     .build(),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.pCollections(),
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                Flatten.<Integer>pCollections(),
                 p);
 
     assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(true));
@@ -523,15 +524,15 @@ public class PTransformMatchersTest implements Serializable {
   public void flattenWithDuplicateInputsNonFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+            .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
                 "EmptyFlatten",
                 Collections.<TupleTag<?>, PValue>emptyMap(),
                 Collections.<TupleTag<?>, PValue>singletonMap(
-                    new TupleTag<Object>(),
+                    new TupleTag<Integer>(),
                     PCollection.createPrimitiveOutputInternal(
-                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
-                Flatten.iterables() /* This isn't actually possible to construct,
-                                 * but for the sake of example */,
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+                /* This isn't actually possible to construct, but for the sake of example */
+                Flatten.<Integer>iterables(),
                 p);
 
     assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
index f8d01e9..0165e4b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.util.Map;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -50,23 +52,23 @@ public class ReplacementOutputsTest {
 
   private PCollection<Integer> ints =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
   private PCollection<Integer> moreInts =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
   private PCollection<String> strs =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
 
   private PCollection<Integer> replacementInts =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
   private PCollection<Integer> moreReplacementInts =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
   private PCollection<String> replacementStrs =
       PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
 
   @Test
   public void singletonSucceeds() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index fca3c76..1fdf07c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -111,12 +111,10 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
     @Override
     public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-
-    @Override
-    public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-      return GroupByKey.getOutputKvCoder(input.getCoder());
+          input.getPipeline(),
+          input.getWindowingStrategy(),
+          input.isBounded(),
+          (Coder) GroupByKey.getOutputKvCoder(input.getCoder()));
     }
   }
 
@@ -244,9 +242,8 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
       Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
       Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
 
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), windowingStrategy, input.isBounded())
-          .setCoder(outputKvCoder);
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), windowingStrategy, input.isBounded(), outputKvCoder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 6e97645..af720fd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -72,8 +72,15 @@ public class SplittableParDoViaKeyedWorkItems {
           PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
     @Override
     public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
+      KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          input.isBounded(),
+          KeyedWorkItemCoder.of(
+              kvCoder.getKeyCoder(),
+              kvCoder.getValueCoder(),
+              input.getWindowingStrategy().getWindowFn().windowCoder()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 06b8e29..3ba04e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -78,22 +77,18 @@ class DirectGroupByKey<K, V>
     @Override
     public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          input.isBounded(),
+          KeyedWorkItemCoder.of(
+              GroupByKey.getKeyCoder(input.getCoder()),
+              GroupByKey.getInputValueCoder(input.getCoder()),
+              input.getWindowingStrategy().getWindowFn().windowCoder()));
     }
 
     DirectGroupByKeyOnly() {}
 
     @Override
-    protected Coder<?> getDefaultOutputCoder(
-        @SuppressWarnings("unused") PCollection<KV<K, V>> input)
-        throws CannotProvideCoderException {
-      return KeyedWorkItemCoder.of(
-          GroupByKey.getKeyCoder(input.getCoder()),
-          GroupByKey.getInputValueCoder(input.getCoder()),
-          input.getWindowingStrategy().getWindowFn().windowCoder());
-    }
-
-    @Override
     public String getUrn() {
       return DIRECT_GBKO_URN;
     }
@@ -135,17 +130,11 @@ class DirectGroupByKey<K, V>
     }
 
     @Override
-    protected Coder<?> getDefaultOutputCoder(
-        @SuppressWarnings("unused") PCollection<KeyedWorkItem<K, V>> input)
-        throws CannotProvideCoderException {
-      KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
-      return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()));
-    }
-
-    @Override
     public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
+      KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), outputWindowingStrategy, input.isBounded());
+          input.getPipeline(), outputWindowingStrategy, input.isBounded(),
+          KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder())));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 891d102..3f04b56 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 import static com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
@@ -248,6 +249,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
           PCollectionTuple.ofPrimitiveOutputsInternal(
               input.getPipeline(),
               TupleTagList.of(getMainOutputTag()).and(getAdditionalOutputTags().getAll()),
+              // TODO
+              Collections.<TupleTag<?>, Coder<?>>emptyMap(),
               input.getWindowingStrategy(),
               input.isBounded());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 16c8589..49e7be7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -207,9 +207,11 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
       @Override
       public PCollection<T> expand(PBegin input) {
         runner.setClockSupplier(new TestClockSupplier());
-        return PCollection.<T>createPrimitiveOutputInternal(
-                input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(original.getValueCoder());
+        return PCollection.createPrimitiveOutputInternal(
+            input.getPipeline(),
+            WindowingStrategy.globalDefault(),
+            IsBounded.UNBOUNDED,
+            original.getValueCoder());
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 5dcf016..c2255fe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -115,9 +115,8 @@ class ViewOverrideFactory<ElemT, ViewT>
     @Override
     @SuppressWarnings("deprecation")
     public PCollection<Iterable<ElemT>> expand(PCollection<Iterable<ElemT>> input) {
-      return PCollection.<Iterable<ElemT>>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
     }
 
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 8b95b34..29ed55d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -113,13 +114,24 @@ public class CommittedResultTest implements Serializable {
 
   @Test
   public void getOutputsEqualInput() {
-    List<? extends CommittedBundle<?>> outputs =
-        ImmutableList.of(bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
-            WindowingStrategy.globalDefault(),
-            PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
-            bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
-                WindowingStrategy.globalDefault(),
-                PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
+    List<? extends CommittedBundle<Integer>> outputs =
+        ImmutableList.of(
+            bundleFactory
+                .createBundle(
+                    PCollection.createPrimitiveOutputInternal(
+                        p,
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.BOUNDED,
+                        VarIntCoder.of()))
+                .commit(Instant.now()),
+            bundleFactory
+                .createBundle(
+                    PCollection.createPrimitiveOutputInternal(
+                        p,
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.UNBOUNDED,
+                        VarIntCoder.of()))
+                .commit(Instant.now()));
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 699a318..cc9ce60 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
@@ -127,8 +128,11 @@ public class EvaluationContextTest {
   public void writeToViewWriterThenReadReads() {
     PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
         context.createPCollectionViewWriter(
-            PCollection.<Iterable<Integer>>createPrimitiveOutputInternal(
-                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED),
+            PCollection.createPrimitiveOutputInternal(
+                p,
+                WindowingStrategy.globalDefault(),
+                IsBounded.BOUNDED,
+                IterableCoder.of(VarIntCoder.of())),
             view);
     BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
     BoundedWindow second = new TestBoundedWindow(new Instant(899999L));

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index 0cc3aec..3114a6f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -120,9 +120,8 @@ class CreateStreamingFlinkView<ElemT, ViewT>
 
     @Override
     public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
-      return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
     }
 
     public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index 572b005..d015d2b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -59,8 +59,8 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
         transform.getOutputStrategyInternal(input.getWindowingStrategy());
     if (transform.getWindowFn() != null) {
       // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
-      return PCollection.<T>createPrimitiveOutputInternal(
-                            input.getPipeline(), outputStrategy, input.isBounded());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), outputStrategy, input.isBounded(), input.getCoder());
     } else {
       // If the windowFn didn't change, we just run a pass-through transform and then set the
       // new windowing strategy.

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ad3faed..9a77b4b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1258,18 +1258,13 @@ class BatchViewOverrides {
 
     @Override
     public PCollection<KV<K1, Iterable<KV<K2, V>>>> expand(PCollection<KV<K1, KV<K2, V>>> input) {
-      PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
-          PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal(
-              input.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              IsBounded.BOUNDED);
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder) input.getCoder();
-      rval.setCoder(
-          KvCoder.of(inputCoder.getKeyCoder(),
-              IterableCoder.of(inputCoder.getValueCoder())));
-      return rval;
+      @SuppressWarnings("unchecked")
+      KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder<K1, KV<K2, V>>) input.getCoder();
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.BOUNDED,
+          KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index caad7f8..3b01d69 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -37,9 +37,8 @@ public class CreateDataflowView<ElemT, ViewT>
 
   @Override
   public PCollection<ElemT> expand(PCollection<ElemT> input) {
-    return PCollection.<ElemT>createPrimitiveOutputInternal(
-            input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-        .setCoder(input.getCoder());
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
   }
 
   public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8fce5b4..6999616 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -321,7 +321,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         overridesBuilder.add(
             PTransformOverride.of(
                 PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
-                new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this)));
+                new StreamingPubsubIOReadOverrideFactory()));
       }
       if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
         overridesBuilder.add(
@@ -359,11 +359,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               // must precede it
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(Read.Bounded.class),
-                  new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this)))
+                  new StreamingBoundedReadOverrideFactory()))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(Read.Unbounded.class),
-                  new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this)))
+                  new StreamingUnboundedReadOverrideFactory()))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
@@ -448,38 +448,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  private static class ReflectiveRootOverrideFactory<T>
-      implements PTransformOverrideFactory<
-          PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> {
-    private final Class<PTransform<PBegin, PCollection<T>>> replacement;
-    private final DataflowRunner runner;
-
-    private ReflectiveRootOverrideFactory(
-        Class<PTransform<PBegin, PCollection<T>>> replacement, DataflowRunner runner) {
-      this.replacement = replacement;
-      this.runner = runner;
-    }
-
-    @Override
-    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
-      PTransform<PInput, PCollection<T>> original = transform.getTransform();
-      return PTransformReplacement.of(
-          transform.getPipeline().begin(),
-          InstanceBuilder.ofType(replacement)
-              .withArg(DataflowRunner.class, runner)
-              .withArg(
-                  (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
-              .build());
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
-      return ReplacementOutputs.singleton(outputs, newOutput);
-    }
-  }
-
   private String debuggerMessage(String projectId, String uniquifier) {
     return String.format("To debug your job, visit Google Cloud Debugger at: "
         + "https://console.developers.google.com/debug?project=%s&dbgee=%s",
@@ -838,6 +806,24 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // PubsubIO translations
   // ================================================================================
 
+  private static class StreamingPubsubIOReadOverrideFactory
+      implements PTransformOverrideFactory<
+          PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> {
+    @Override
+    public PTransformReplacement<PBegin, PCollection<PubsubMessage>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new StreamingPubsubIORead(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<PubsubMessage> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+
   /**
    * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can
    * instead defer to Windmill's implementation.
@@ -846,9 +832,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       extends PTransform<PBegin, PCollection<PubsubMessage>> {
     private final PubsubUnboundedSource transform;
 
-    /** Builds an instance of this class from the overridden transform. */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingPubsubIORead(DataflowRunner runner, PubsubUnboundedSource transform) {
+    public StreamingPubsubIORead(PubsubUnboundedSource transform) {
       this.transform = transform;
     }
 
@@ -858,9 +842,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public PCollection<PubsubMessage> expand(PBegin input) {
-      return PCollection.<PubsubMessage>createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-          .setCoder(new PubsubMessageWithAttributesCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.UNBOUNDED,
+          new PubsubMessageWithAttributesCoder());
     }
 
     @Override
@@ -1129,12 +1115,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     @Override
     public PCollection<byte[]> expand(PBegin input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), isBounded);
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder() {
-      return ByteArrayCoder.of();
+          input.getPipeline(), WindowingStrategy.globalDefault(), isBounded, ByteArrayCoder.of());
     }
 
     private static class Translator implements TransformTranslator<Impulse> {
@@ -1157,6 +1138,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
+  private static class StreamingUnboundedReadOverrideFactory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Unbounded<T>> {
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, Read.Unbounded<T>> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new StreamingUnboundedRead<>(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
   /**
    * Specialized implementation for
    * {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
@@ -1168,18 +1165,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static class StreamingUnboundedRead<T> extends PTransform<PBegin, PCollection<T>> {
     private final UnboundedSource<T, ?> source;
 
-    /** Builds an instance of this class from the overridden transform. */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
+    public StreamingUnboundedRead(Read.Unbounded<T> transform) {
       this.source = transform.getSource();
     }
 
     @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getOutputCoder();
-    }
-
-    @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
@@ -1206,13 +1196,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @Override
       public final PCollection<ValueWithRecordId<T>> expand(PInput input) {
-        return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal(
-            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
-      }
-
-      @Override
-      protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
-        return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder());
+        return PCollection.createPrimitiveOutputInternal(
+            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED,
+            ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder()));
       }
 
       @Override
@@ -1276,6 +1262,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
+  private static class StreamingBoundedReadOverrideFactory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Bounded<T>> {
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new StreamingBoundedRead<>(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
   /**
    * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the
    * Dataflow runner in streaming mode.
@@ -1283,18 +1285,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static class StreamingBoundedRead<T> extends PTransform<PBegin, PCollection<T>> {
     private final BoundedSource<T> source;
 
-    /** Builds an instance of this class from the overridden transform. */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
+    public StreamingBoundedRead(Read.Bounded<T> transform) {
       this.source = transform.getSource();
     }
 
     @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getOutputCoder();
-    }
-
-    @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
@@ -1404,15 +1399,19 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   static class CombineGroupedValues<K, InputT, OutputT>
       extends PTransform<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
     private final Combine.GroupedValues<K, InputT, OutputT> original;
+    private final Coder<KV<K, OutputT>> outputCoder;
 
-    CombineGroupedValues(GroupedValues<K, InputT, OutputT> original) {
+    CombineGroupedValues(
+        GroupedValues<K, InputT, OutputT> original, Coder<KV<K, OutputT>> outputCoder) {
       this.original = original;
+      this.outputCoder = outputCoder;
     }
 
     @Override
     public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<InputT>>> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(),
+          outputCoder);
     }
 
     public Combine.GroupedValues<K, InputT, OutputT> getOriginalCombine() {
@@ -1433,7 +1432,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new CombineGroupedValues<>(transform.getTransform()));
+          new CombineGroupedValues<>(
+              transform.getTransform(),
+              PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 8611d3c..9252c64 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.DisplayData;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -49,7 +50,9 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
               transform) {
     return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
-        new ParDoSingle<>(transform.getTransform()));
+        new ParDoSingle<>(
+            transform.getTransform(),
+            PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
   }
 
   /**
@@ -58,15 +61,18 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
   public static class ParDoSingle<InputT, OutputT>
       extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
     private final ParDo.SingleOutput<InputT, OutputT> original;
+    private final Coder<OutputT> outputCoder;
 
-    private ParDoSingle(ParDo.SingleOutput<InputT, OutputT> original) {
+    private ParDoSingle(SingleOutput<InputT, OutputT> original, Coder<OutputT> outputCoder) {
       this.original = original;
+      this.outputCoder = outputCoder;
     }
 
     @Override
     public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(),
+          outputCoder);
     }
 
     public DoFn<InputT, OutputT> getFn() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 9a0bdf8..7a99f75 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -650,10 +650,13 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
       // Fails here when attempting to construct a tuple with an unbound object.
       return PCollectionTuple.of(sumTag, sum)
-          .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
-              input.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              input.isBounded()));
+          .and(
+              doneTag,
+              PCollection.createPrimitiveOutputInternal(
+                  input.getPipeline(),
+                  WindowingStrategy.globalDefault(),
+                  input.isBounded(),
+                  VoidCoder.of()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 9db73c6..55264a1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -72,7 +72,6 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOption
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -953,15 +952,11 @@ public class DataflowRunnerTest implements Serializable {
 
     @Override
     public PCollection<Integer> expand(PCollection<Integer> input) {
-      return PCollection.<Integer>createPrimitiveOutputInternal(
+      return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(),
           WindowingStrategy.globalDefault(),
-          input.isBounded());
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder(PCollection<Integer> input) {
-      return input.getCoder();
+          input.isBounded(),
+          input.getCoder());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index 737b408..c198ebf 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
 import org.junit.Before;
@@ -105,11 +105,11 @@ public class DataflowGroupByKeyTest {
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
               public PCollection<KV<String, Integer>> expand(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+                return PCollection.createPrimitiveOutputInternal(
+                    input.getPipeline(),
+                    WindowingStrategy.globalDefault(),
+                    PCollection.IsBounded.UNBOUNDED,
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
               }
             });
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index dea96b9..e2e42a6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -21,6 +21,9 @@ import com.google.api.services.dataflow.Dataflow;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
@@ -33,7 +36,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
@@ -94,11 +96,11 @@ public class DataflowViewTest {
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
               public PCollection<KV<String, Integer>> expand(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+                return PCollection.createPrimitiveOutputInternal(
+                    input.getPipeline(),
+                    WindowingStrategy.globalDefault(),
+                    PCollection.IsBounded.UNBOUNDED,
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
               }
             })
         .apply(view);

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index fdcea99..d485d25 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -203,11 +202,9 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
   @Override
   public PCollection<T> expand(PBegin input) {
     return PCollection.createPrimitiveOutputInternal(
-        input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
-  }
-
-  @Override
-  protected Coder<T> getDefaultOutputCoder() throws CannotProvideCoderException {
-    return coder;
+        input.getPipeline(),
+        WindowingStrategy.globalDefault(),
+        PCollection.IsBounded.UNBOUNDED,
+        coder);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
index 0ecfa75..b236ce7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.spark.translation;
 
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -32,12 +31,7 @@ public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCo
   public PCollection<String> expand(PCollection<?> input) {
     return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
         WindowingStrategy.globalDefault(),
-        PCollection.IsBounded.BOUNDED);
+        PCollection.IsBounded.BOUNDED,
+        StringUtf8Coder.of());
   }
-
-  @Override
-  public Coder getDefaultOutputCoder() {
-    return StringUtf8Coder.of();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
deleted file mode 100644
index 299f5ba..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.util;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * A {@link PTransform} wrapping another transform.
- */
-public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> {
-  private PTransform<PInput, PCollection<T>> transform;
-
-  public SinglePrimitiveOutputPTransform(PTransform<PInput, PCollection<T>> transform) {
-    this.transform = transform;
-  }
-
-  @Override
-  public PCollection<T> expand(PInput input) {
-    try {
-      PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
-              input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-      collection.setCoder(transform.getDefaultOutputCoder(input, collection));
-      return collection;
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException(
-          "Unable to infer a coder and no Coder was specified. "
-              + "Please set a coder by invoking Create.withCoder() explicitly.",
-          e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 6e6750d..574ba0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -95,17 +95,14 @@ public class Read {
     }
 
     @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getOutputCoder();
-    }
-
-    @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
 
-      return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
-          WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-          .setCoder(getDefaultOutputCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.BOUNDED,
+          source.getOutputCoder());
     }
 
     /**
@@ -170,9 +167,11 @@ public class Read {
     @Override
     public final PCollection<T> expand(PBegin input) {
       source.validate();
-
-      return PCollection.<T>createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.UNBOUNDED,
+          source.getOutputCoder());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index d13fcf1..45f4413 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -253,9 +253,8 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
 
   @Override
   public PCollection<T> expand(PBegin input) {
-    return PCollection.<T>createPrimitiveOutputInternal(
-            input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-        .setCoder(coder);
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder);
   }
 
   public Coder<T> getValueCoder() {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 25d9c05..8247a58 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableLikeCoder;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -129,25 +128,12 @@ public class Flatten {
         windowingStrategy = WindowingStrategy.globalDefault();
       }
 
-      return PCollection.<T>createPrimitiveOutputInternal(
+      return PCollection.createPrimitiveOutputInternal(
           inputs.getPipeline(),
           windowingStrategy,
-          isBounded);
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder(PCollectionList<T> input)
-        throws CannotProvideCoderException {
-
-      // Take coder from first collection
-      for (PCollection<T> pCollection : input.getAll()) {
-        return pCollection.getCoder();
-      }
-
-      // No inputs
-      throw new CannotProvideCoderException(
-          this.getClass().getSimpleName() + " cannot provide a Coder for"
-          + " empty " + PCollectionList.class.getSimpleName());
+          isBounded,
+          // Take coder from first collection. If there are none, will be left unspecified.
+          inputs.getAll().isEmpty() ? null : inputs.get(0).getCoder());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 7516b25..3cb0d23 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -217,13 +217,11 @@ public class GroupByKey<K, V>
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the
     // window function associated with the input PCollection.
-    return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-        updateWindowingStrategy(input.getWindowingStrategy()), input.isBounded());
-  }
-
-  @Override
-  protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-    return getOutputKvCoder(input.getCoder());
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(),
+        updateWindowingStrategy(input.getWindowingStrategy()),
+        input.isBounded(),
+        getOutputKvCoder(input.getCoder()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 0d03835..bc4f629 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -767,6 +767,8 @@ public class ParDo {
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),
           TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
+          // TODO
+          Collections.<TupleTag<?>, Coder<?>>emptyMap(),
           input.getWindowingStrategy(),
           input.isBounded());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 57dccbc..f6f3af5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -509,9 +509,8 @@ public class View {
 
     @Override
     public PCollection<ElemT> expand(PCollection<ElemT> input) {
-      return PCollection.<ElemT>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index a12be6d..af583e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -484,7 +484,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), updatedStrategy, input.isBounded());
+          input.getPipeline(), updatedStrategy, input.isBounded(), input.getCoder());
     }
 
     @Override