You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:29 UTC

[4/6] beam git commit: Makes all Source classes override getOutputCoder instead of getDefaultOutputCoder

Makes all Source classes override getOutputCoder instead of getDefaultOutputCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e017a0ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e017a0ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e017a0ec

Branch: refs/heads/master
Commit: e017a0ec8a16b63828d0955f405b23bc9771bc9e
Parents: 38f1890
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 16:05:27 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700

----------------------------------------------------------------------
 .../runners/apex/translation/utils/ValuesSource.java  |  2 +-
 .../runners/apex/examples/UnboundedTextSource.java    |  2 +-
 .../apex/translation/GroupByKeyTranslatorTest.java    |  2 +-
 .../apex/translation/utils/CollectionSource.java      |  2 +-
 .../construction/UnboundedReadFromBoundedSource.java  |  8 ++++----
 .../core/construction/ReadTranslationTest.java        |  4 ++--
 .../UnboundedReadFromBoundedSourceTest.java           |  2 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java       |  2 +-
 .../apache/beam/runners/direct/DirectRunnerTest.java  |  4 ++--
 .../direct/UnboundedReadEvaluatorFactoryTest.java     |  2 +-
 .../wrappers/streaming/io/UnboundedSocketSource.java  |  2 +-
 .../runners/flink/streaming/TestCountingSource.java   |  2 +-
 .../apache/beam/runners/dataflow/DataflowRunner.java  |  6 +++---
 .../beam/runners/spark/io/MicrobatchSource.java       |  4 ++--
 .../beam/runners/spark/io/SparkUnboundedSource.java   |  2 +-
 .../runners/spark/stateful/StateSpecFunctions.java    |  2 +-
 .../main/java/org/apache/beam/sdk/io/AvroSource.java  |  2 +-
 .../beam/sdk/io/BoundedReadFromUnboundedSource.java   |  6 +++---
 .../java/org/apache/beam/sdk/io/CompressedSource.java |  6 +++---
 .../java/org/apache/beam/sdk/io/CountingSource.java   |  4 ++--
 .../src/main/java/org/apache/beam/sdk/io/Read.java    |  4 ++--
 .../src/main/java/org/apache/beam/sdk/io/Source.java  | 14 ++++++++++----
 .../main/java/org/apache/beam/sdk/io/TFRecordIO.java  |  2 +-
 .../main/java/org/apache/beam/sdk/io/TextSource.java  |  2 +-
 .../org/apache/beam/sdk/testing/SourceTestUtils.java  | 12 ++++++------
 .../java/org/apache/beam/sdk/transforms/Create.java   |  2 +-
 .../org/apache/beam/sdk/io/CompressedSourceTest.java  |  2 +-
 .../org/apache/beam/sdk/io/FileBasedSourceTest.java   |  2 +-
 .../org/apache/beam/sdk/io/OffsetBasedSourceTest.java |  2 +-
 .../test/java/org/apache/beam/sdk/io/ReadTest.java    |  4 ++--
 .../beam/sdk/runners/dataflow/TestCountingSource.java |  2 +-
 .../org/apache/beam/sdk/transforms/CreateTest.java    |  4 ++--
 .../main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java |  2 +-
 .../org/apache/beam/sdk/io/cassandra/CassandraIO.java |  2 +-
 .../beam/sdk/io/elasticsearch/ElasticsearchIO.java    |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java  |  4 ++--
 .../apache/beam/sdk/io/gcp/bigtable/BigtableIO.java   |  2 +-
 .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java |  2 +-
 .../io/hadoop/inputformat/HadoopInputFormatIO.java    |  2 +-
 .../java/org/apache/beam/sdk/io/hbase/HBaseIO.java    |  2 +-
 .../org/apache/beam/sdk/io/hcatalog/HCatalogIO.java   |  2 +-
 .../main/java/org/apache/beam/sdk/io/jms/JmsIO.java   |  2 +-
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java    |  2 +-
 .../org/apache/beam/sdk/io/kinesis/KinesisSource.java |  2 +-
 .../apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java   |  2 +-
 .../org/apache/beam/sdk/io/mongodb/MongoDbIO.java     |  2 +-
 .../main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java |  2 +-
 .../java/org/apache/beam/sdk/io/xml/XmlSource.java    |  2 +-
 48 files changed, 79 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
index 41f027f..4a00ff1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -81,7 +81,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
+  public Coder<T> getOutputCoder() {
     return iterableCoder.getElemCoder();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index c590a2e..8f3e6bc 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -59,7 +59,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource
   }
 
   @Override
-  public Coder<String> getDefaultOutputCoder() {
+  public Coder<String> getOutputCoder() {
     return StringUtf8Coder.of();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 9c61b47..58f33ae 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -153,7 +153,7 @@ public class GroupByKeyTranslatorTest {
     }
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
index 288aade..01a2a85 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
@@ -67,7 +67,7 @@ public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.Chec
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
+  public Coder<T> getOutputCoder() {
     return coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 24eb384..f35f4c3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -92,7 +92,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   protected Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
+    return source.getOutputCoder();
   }
 
   @Override
@@ -166,14 +166,14 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return boundedSource.getDefaultOutputCoder();
+    public Coder<T> getOutputCoder() {
+      return boundedSource.getOutputCoder();
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Override
     public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
-      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+      return new CheckpointCoder<>(boundedSource.getOutputCoder());
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
index 740b324..f85bd79 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
@@ -112,7 +112,7 @@ public class ReadTranslationTest {
     public void validate() {}
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
 
@@ -132,7 +132,7 @@ public class ReadTranslationTest {
     public void validate() {}
 
     @Override
-    public Coder<byte[]> getDefaultOutputCoder() {
+    public Coder<byte[]> getOutputCoder() {
       return ByteArrayCoder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index 0e48a9d..62b06b7 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -320,7 +320,7 @@ public class UnboundedReadFromBoundedSourceTest {
     }
 
     @Override
-    public Coder<Byte> getDefaultOutputCoder() {
+    public Coder<Byte> getOutputCoder() {
       return SerializableCoder.of(Byte.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 6180d29..3d81884 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -395,7 +395,7 @@ public class BoundedReadEvaluatorFactoryTest {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
+    public Coder<T> getOutputCoder() {
       return coder;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 943d27c..d3f407a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -573,8 +573,8 @@ public class DirectRunnerTest implements Serializable {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return underlying.getDefaultOutputCoder();
+    public Coder<T> getOutputCoder() {
+      return underlying.getOutputCoder();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 2a01db5..cc6847d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -477,7 +477,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     public void validate() {}
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
+    public Coder<T> getOutputCoder() {
       return coder;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 910a33f..49e4ddc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -123,7 +123,7 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
   }
 
   @Override
-  public Coder getDefaultOutputCoder() {
+  public Coder<String> getOutputCoder() {
     return DEFAULT_SOCKET_CODER;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index edf548a..fcb9282 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -238,7 +238,7 @@ public class TestCountingSource
   public void validate() {}
 
   @Override
-  public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+  public Coder<KV<Integer, Integer>> getOutputCoder() {
     return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index f8d2c3c..8fce5b4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1176,7 +1176,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
+      return source.getOutputCoder();
     }
 
     @Override
@@ -1212,7 +1212,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @Override
       protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
-        return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder());
+        return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder());
       }
 
       @Override
@@ -1291,7 +1291,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
+      return source.getOutputCoder();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 3b48caf..ae873a3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -140,8 +140,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
+  public Coder<T> getOutputCoder() {
+    return source.getOutputCoder();
   }
 
   public Coder<CheckpointMarkT> getCheckpointMarkCoder() {

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index b31aa9f..26af0c0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -116,7 +116,7 @@ public class SparkUnboundedSource {
     // output the actual (deserialized) stream.
     WindowedValue.FullWindowedValueCoder<T> coder =
         WindowedValue.FullWindowedValueCoder.of(
-            source.getDefaultOutputCoder(),
+            source.getOutputCoder(),
             GlobalWindow.Coder.INSTANCE);
     JavaDStream<WindowedValue<T>> readUnboundedStream =
         mapWithStateDStream

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 1b54478..ca54715 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -161,7 +161,7 @@ public class StateSpecFunctions {
         final List<byte[]> readValues = new ArrayList<>();
         WindowedValue.FullWindowedValueCoder<T> coder =
             WindowedValue.FullWindowedValueCoder.of(
-                source.getDefaultOutputCoder(),
+                source.getOutputCoder(),
                 GlobalWindow.Coder.INSTANCE);
         try {
           // measure how long a read takes per-partition.

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index d277503..8dd3125 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -315,7 +315,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
+  public Coder<T> getOutputCoder() {
     return mode.getOutputCoder();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index c882447..8505ca4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -119,7 +119,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   protected Coder<T> getDefaultOutputCoder() {
-    return source.getDefaultOutputCoder();
+    return source.getOutputCoder();
   }
 
   @Override
@@ -211,8 +211,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @Override
-    public Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
-      return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getDefaultOutputCoder());
+    public Coder<ValueWithRecordId<T>> getOutputCoder() {
+      return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getOutputCoder());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ad81b61..6943a02 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -404,11 +404,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
   }
 
   /**
-   * Returns the delegate source's default output coder.
+   * Returns the delegate source's output coder.
    */
   @Override
-  public final Coder<T> getDefaultOutputCoder() {
-    return sourceDelegate.getDefaultOutputCoder();
+  public final Coder<T> getOutputCoder() {
+    return sourceDelegate.getOutputCoder();
   }
 
   public final DecompressingChannelFactory getChannelFactory() {

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 6202c2b..b47edc7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -188,7 +188,7 @@ public class CountingSource {
     }
 
     @Override
-    public Coder<Long> getDefaultOutputCoder() {
+    public Coder<Long> getOutputCoder() {
       return VarLongCoder.of();
     }
 
@@ -364,7 +364,7 @@ public class CountingSource {
     public void validate() {}
 
     @Override
-    public Coder<Long> getDefaultOutputCoder() {
+    public Coder<Long> getOutputCoder() {
       return VarLongCoder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index a07fca8..6e6750d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -96,7 +96,7 @@ public class Read {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
+      return source.getOutputCoder();
     }
 
     @Override
@@ -164,7 +164,7 @@ public class Read {
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
+      return source.getOutputCoder();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index 542d91c..32a7270 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -61,10 +61,16 @@ public abstract class Source<T> implements Serializable, HasDisplayData {
    */
   public abstract void validate();
 
-  /**
-   * Returns the default {@code Coder} to use for the data read from this source.
-   */
-  public abstract Coder<T> getDefaultOutputCoder();
+  /** @deprecated Override {@link #getOutputCoder()} instead. */
+  @Deprecated
+  public Coder<T> getDefaultOutputCoder() {
+    throw new UnsupportedOperationException("Source needs to override getOutputCoder()");
+  }
+
+  /** Returns the {@code Coder} to use for the data read from this source. */
+  public Coder<T> getOutputCoder() {
+    return getDefaultOutputCoder();
+  }
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 29b3e29..1b2e95b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -474,7 +474,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public Coder<byte[]> getDefaultOutputCoder() {
+    public Coder<byte[]> getOutputCoder() {
       return DEFAULT_BYTE_ARRAY_CODER;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 4d9fa77..86c73d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -69,7 +69,7 @@ class TextSource extends FileBasedSource<String> {
   }
 
   @Override
-  public Coder<String> getDefaultOutputCoder() {
+  public Coder<String> getOutputCoder() {
     return StringUtf8Coder.of();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index cde0b94..e147221 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -212,7 +212,7 @@ public class SourceTestUtils {
       List<? extends BoundedSource<T>> sources,
       PipelineOptions options)
       throws Exception {
-    Coder<T> coder = referenceSource.getDefaultOutputCoder();
+    Coder<T> coder = referenceSource.getOutputCoder();
     List<T> referenceRecords = readFromSource(referenceSource, options);
     List<T> bundleRecords = new ArrayList<>();
     for (BoundedSource<T> source : sources) {
@@ -221,7 +221,7 @@ public class SourceTestUtils {
               + source
               + " is not compatible with Coder type for referenceSource "
               + referenceSource,
-          source.getDefaultOutputCoder(),
+          source.getOutputCoder(),
           equalTo(coder));
       List<T> elems = readFromSource(source, options);
       bundleRecords.addAll(elems);
@@ -239,7 +239,7 @@ public class SourceTestUtils {
    */
   public static <T> void assertUnstartedReaderReadsSameAsItsSource(
       BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
-    Coder<T> coder = reader.getCurrentSource().getDefaultOutputCoder();
+    Coder<T> coder = reader.getCurrentSource().getOutputCoder();
     List<T> expected = readFromUnstartedReader(reader);
     List<T> actual = readFromSource(reader.getCurrentSource(), options);
     List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);
@@ -415,7 +415,7 @@ public class SourceTestUtils {
               source,
               primary,
               residual);
-      Coder<T> coder = primary.getDefaultOutputCoder();
+      Coder<T> coder = primary.getOutputCoder();
       List<ReadableStructuralValue<T>> primaryValues =
           createStructuralValues(coder, primaryItems);
       List<ReadableStructuralValue<T>> currentValues =
@@ -728,8 +728,8 @@ public class SourceTestUtils {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return boundedSource.getDefaultOutputCoder();
+    public Coder<T> getOutputCoder() {
+      return boundedSource.getOutputCoder();
     }
 
     private static class UnsplittableReader<T> extends BoundedReader<T> {

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 09e12ef..a28e9b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -411,7 +411,7 @@ public class Create<T> {
       public void validate() {}
 
       @Override
-      public Coder<T> getDefaultOutputCoder() {
+      public Coder<T> getOutputCoder() {
         return coder;
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index fa28e4b..fe6f01f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -638,7 +638,7 @@ public class CompressedSourceTest {
     }
 
     @Override
-    public Coder<Byte> getDefaultOutputCoder() {
+    public Coder<Byte> getOutputCoder() {
       return SerializableCoder.of(Byte.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index c15e667..1bdb915 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -107,7 +107,7 @@ public class FileBasedSourceTest {
     public void validate() {}
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 25168a3..feda355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -65,7 +65,7 @@ public class OffsetBasedSourceTest {
     public void validate() {}
 
     @Override
-    public Coder<Integer> getDefaultOutputCoder() {
+    public Coder<Integer> getOutputCoder() {
       return BigEndianIntegerCoder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 74acf18..4277dc3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -171,7 +171,7 @@ public class ReadTest implements Serializable{
     public void validate() {}
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
   }
@@ -207,7 +207,7 @@ public class ReadTest implements Serializable{
     public void validate() {}
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
index 9fcc3c5..338ea38 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
@@ -248,7 +248,7 @@ public class TestCountingSource
   public void validate() {}
 
   @Override
-  public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+  public Coder<KV<Integer, Integer>> getOutputCoder() {
     return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 6a682ef..6be6772 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -487,12 +487,12 @@ public class CreateTest {
   }
 
   @Test
-  public void testSourceGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
+  public void testSourceGetOutputCoderReturnsConstructorCoder() throws Exception {
     Coder<Integer> coder = VarIntCoder.of();
     CreateSource<Integer> source =
         CreateSource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
 
-    Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
+    Coder<Integer> defaultCoder = source.getOutputCoder();
     assertThat(defaultCoder, equalTo(coder));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
index 1f307b2..508373f 100644
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
@@ -246,7 +246,7 @@ public class AmqpIO {
     }
 
     @Override
-    public Coder<Message> getDefaultOutputCoder() {
+    public Coder<Message> getOutputCoder() {
       return new AmqpMessageCoder();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 32905b7..eacc3e4 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -289,7 +289,7 @@ public class CassandraIO {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder() {
+    public Coder<T> getOutputCoder() {
       return spec.coder();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index acc7f2f..5046888 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -484,7 +484,7 @@ public class ElasticsearchIO {
     }
 
     @Override
-    public Coder<String> getDefaultOutputCoder() {
+    public Coder<String> getOutputCoder() {
       return StringUtf8Coder.of();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 6c118a0..abe559c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -133,7 +133,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
   }
 
   @Override
-  public Coder<TableRow> getDefaultOutputCoder() {
+  public Coder<TableRow> getOutputCoder() {
     return TableRowJsonCoder.of();
   }
 
@@ -184,7 +184,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
     List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
     for (ResourceId file : files) {
       avroSources.add(
-          AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder()));
+          AvroSource.from(file.toString()).withParseFn(function, getOutputCoder()));
     }
     return ImmutableList.copyOf(avroSources);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 0a90dde..c5b0fbf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -893,7 +893,7 @@ public class BigtableIO {
     }
 
     @Override
-    public Coder<Row> getDefaultOutputCoder() {
+    public Coder<Row> getOutputCoder() {
       return ProtoCoder.of(Row.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b7df804..8da6ff4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1164,7 +1164,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     }
 
     @Override
-    public Coder<PubsubMessage> getDefaultOutputCoder() {
+    public Coder<PubsubMessage> getOutputCoder() {
       return outer.getNeedsAttributes()
           ? PubsubMessageWithAttributesCoder.of()
           : PubsubMessagePayloadOnlyCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 0b4c23f..20ca50a 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -552,7 +552,7 @@ public class HadoopInputFormatIO {
     }
 
     @Override
-    public Coder<KV<K, V>> getDefaultOutputCoder() {
+    public Coder<KV<K, V>> getOutputCoder() {
       return KvCoder.of(keyCoder, valueCoder);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 90ede4c..2ba6826 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -457,7 +457,7 @@ public class HBaseIO {
         }
 
         @Override
-        public Coder<Result> getDefaultOutputCoder() {
+        public Coder<Result> getOutputCoder() {
             return HBaseResultCoder.of();
         }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index 4199b80..d8e462b 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -210,7 +210,7 @@ public class HCatalogIO {
 
     @Override
     @SuppressWarnings({"unchecked", "rawtypes"})
-    public Coder<HCatRecord> getDefaultOutputCoder() {
+    public Coder<HCatRecord> getOutputCoder() {
       return (Coder) WritableCoder.of(DefaultHCatRecord.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index f8cba5e..2af0ce9 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -373,7 +373,7 @@ public class JmsIO {
     }
 
     @Override
-    public Coder<JmsRecord> getDefaultOutputCoder() {
+    public Coder<JmsRecord> getOutputCoder() {
       return SerializableCoder.of(JmsRecord.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 026313a..7fb4260 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -844,7 +844,7 @@ public class KafkaIO {
     }
 
     @Override
-    public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
+    public Coder<KafkaRecord<K, V>> getOutputCoder() {
       return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 362792b..144bd80 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -107,7 +107,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
   }
 
   @Override
-  public Coder<KinesisRecord> getDefaultOutputCoder() {
+  public Coder<KinesisRecord> getOutputCoder() {
     return KinesisRecordCoder.of();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 5b5412c..c612d52 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -440,7 +440,7 @@ public class MongoDbGridFSIO {
       }
 
       @Override
-      public Coder<ObjectId> getDefaultOutputCoder() {
+      public Coder<ObjectId> getOutputCoder() {
         return SerializableCoder.of(ObjectId.class);
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 3b14182..087123a 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -277,7 +277,7 @@ public class MongoDbIO {
     }
 
     @Override
-    public Coder<Document> getDefaultOutputCoder() {
+    public Coder<Document> getOutputCoder() {
       return SerializableCoder.of(Document.class);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index add5cb5..5aadb80 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -387,7 +387,7 @@ public class MqttIO {
     }
 
     @Override
-    public Coder<byte[]> getDefaultOutputCoder() {
+    public Coder<byte[]> getOutputCoder() {
       return ByteArrayCoder.of();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
index 7aa42c5..b893d43 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
@@ -85,7 +85,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
   }
 
   @Override
-  public Coder<T> getDefaultOutputCoder() {
+  public Coder<T> getOutputCoder() {
     return JAXBCoder.of(spec.getRecordClass());
   }