You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:29 UTC
[4/6] beam git commit: Makes all Source classes override
getOutputCoder instead of getDefaultOutputCoder
Makes all Source classes override getOutputCoder instead of getDefaultOutputCoder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e017a0ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e017a0ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e017a0ec
Branch: refs/heads/master
Commit: e017a0ec8a16b63828d0955f405b23bc9771bc9e
Parents: 38f1890
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 16:05:27 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700
----------------------------------------------------------------------
.../runners/apex/translation/utils/ValuesSource.java | 2 +-
.../runners/apex/examples/UnboundedTextSource.java | 2 +-
.../apex/translation/GroupByKeyTranslatorTest.java | 2 +-
.../apex/translation/utils/CollectionSource.java | 2 +-
.../construction/UnboundedReadFromBoundedSource.java | 8 ++++----
.../core/construction/ReadTranslationTest.java | 4 ++--
.../UnboundedReadFromBoundedSourceTest.java | 2 +-
.../direct/BoundedReadEvaluatorFactoryTest.java | 2 +-
.../apache/beam/runners/direct/DirectRunnerTest.java | 4 ++--
.../direct/UnboundedReadEvaluatorFactoryTest.java | 2 +-
.../wrappers/streaming/io/UnboundedSocketSource.java | 2 +-
.../runners/flink/streaming/TestCountingSource.java | 2 +-
.../apache/beam/runners/dataflow/DataflowRunner.java | 6 +++---
.../beam/runners/spark/io/MicrobatchSource.java | 4 ++--
.../beam/runners/spark/io/SparkUnboundedSource.java | 2 +-
.../runners/spark/stateful/StateSpecFunctions.java | 2 +-
.../main/java/org/apache/beam/sdk/io/AvroSource.java | 2 +-
.../beam/sdk/io/BoundedReadFromUnboundedSource.java | 6 +++---
.../java/org/apache/beam/sdk/io/CompressedSource.java | 6 +++---
.../java/org/apache/beam/sdk/io/CountingSource.java | 4 ++--
.../src/main/java/org/apache/beam/sdk/io/Read.java | 4 ++--
.../src/main/java/org/apache/beam/sdk/io/Source.java | 14 ++++++++++----
.../main/java/org/apache/beam/sdk/io/TFRecordIO.java | 2 +-
.../main/java/org/apache/beam/sdk/io/TextSource.java | 2 +-
.../org/apache/beam/sdk/testing/SourceTestUtils.java | 12 ++++++------
.../java/org/apache/beam/sdk/transforms/Create.java | 2 +-
.../org/apache/beam/sdk/io/CompressedSourceTest.java | 2 +-
.../org/apache/beam/sdk/io/FileBasedSourceTest.java | 2 +-
.../org/apache/beam/sdk/io/OffsetBasedSourceTest.java | 2 +-
.../test/java/org/apache/beam/sdk/io/ReadTest.java | 4 ++--
.../beam/sdk/runners/dataflow/TestCountingSource.java | 2 +-
.../org/apache/beam/sdk/transforms/CreateTest.java | 4 ++--
.../main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java | 2 +-
.../org/apache/beam/sdk/io/cassandra/CassandraIO.java | 2 +-
.../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java | 4 ++--
.../apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +-
.../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 2 +-
.../io/hadoop/inputformat/HadoopInputFormatIO.java | 2 +-
.../java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 2 +-
.../org/apache/beam/sdk/io/hcatalog/HCatalogIO.java | 2 +-
.../main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +-
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +-
.../org/apache/beam/sdk/io/kinesis/KinesisSource.java | 2 +-
.../apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java | 2 +-
.../org/apache/beam/sdk/io/mongodb/MongoDbIO.java | 2 +-
.../main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +-
.../java/org/apache/beam/sdk/io/xml/XmlSource.java | 2 +-
48 files changed, 79 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
index 41f027f..4a00ff1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -81,7 +81,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return iterableCoder.getElemCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index c590a2e..8f3e6bc 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -59,7 +59,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource
}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 9c61b47..58f33ae 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -153,7 +153,7 @@ public class GroupByKeyTranslatorTest {
}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
index 288aade..01a2a85 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
@@ -67,7 +67,7 @@ public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.Chec
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 24eb384..f35f4c3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -92,7 +92,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
@@ -166,14 +166,14 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return boundedSource.getDefaultOutputCoder();
+ public Coder<T> getOutputCoder() {
+ return boundedSource.getOutputCoder();
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
- return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+ return new CheckpointCoder<>(boundedSource.getOutputCoder());
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
index 740b324..f85bd79 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
@@ -112,7 +112,7 @@ public class ReadTranslationTest {
public void validate() {}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
@@ -132,7 +132,7 @@ public class ReadTranslationTest {
public void validate() {}
@Override
- public Coder<byte[]> getDefaultOutputCoder() {
+ public Coder<byte[]> getOutputCoder() {
return ByteArrayCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index 0e48a9d..62b06b7 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -320,7 +320,7 @@ public class UnboundedReadFromBoundedSourceTest {
}
@Override
- public Coder<Byte> getDefaultOutputCoder() {
+ public Coder<Byte> getOutputCoder() {
return SerializableCoder.of(Byte.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 6180d29..3d81884 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -395,7 +395,7 @@ public class BoundedReadEvaluatorFactoryTest {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return coder;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 943d27c..d3f407a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -573,8 +573,8 @@ public class DirectRunnerTest implements Serializable {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return underlying.getDefaultOutputCoder();
+ public Coder<T> getOutputCoder() {
+ return underlying.getOutputCoder();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 2a01db5..cc6847d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -477,7 +477,7 @@ public class UnboundedReadEvaluatorFactoryTest {
public void validate() {}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 910a33f..49e4ddc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -123,7 +123,7 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
}
@Override
- public Coder getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return DEFAULT_SOCKET_CODER;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index edf548a..fcb9282 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -238,7 +238,7 @@ public class TestCountingSource
public void validate() {}
@Override
- public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+ public Coder<KV<Integer, Integer>> getOutputCoder() {
return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index f8d2c3c..8fce5b4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1176,7 +1176,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
@@ -1212,7 +1212,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
- return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder());
+ return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder());
}
@Override
@@ -1291,7 +1291,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 3b48caf..ae873a3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -140,8 +140,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ public Coder<T> getOutputCoder() {
+ return source.getOutputCoder();
}
public Coder<CheckpointMarkT> getCheckpointMarkCoder() {
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index b31aa9f..26af0c0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -116,7 +116,7 @@ public class SparkUnboundedSource {
// output the actual (deserialized) stream.
WindowedValue.FullWindowedValueCoder<T> coder =
WindowedValue.FullWindowedValueCoder.of(
- source.getDefaultOutputCoder(),
+ source.getOutputCoder(),
GlobalWindow.Coder.INSTANCE);
JavaDStream<WindowedValue<T>> readUnboundedStream =
mapWithStateDStream
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 1b54478..ca54715 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -161,7 +161,7 @@ public class StateSpecFunctions {
final List<byte[]> readValues = new ArrayList<>();
WindowedValue.FullWindowedValueCoder<T> coder =
WindowedValue.FullWindowedValueCoder.of(
- source.getDefaultOutputCoder(),
+ source.getOutputCoder(),
GlobalWindow.Coder.INSTANCE);
try {
// measure how long a read takes per-partition.
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index d277503..8dd3125 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -315,7 +315,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return mode.getOutputCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index c882447..8505ca4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -119,7 +119,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
@@ -211,8 +211,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
- return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getDefaultOutputCoder());
+ public Coder<ValueWithRecordId<T>> getOutputCoder() {
+ return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getOutputCoder());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ad81b61..6943a02 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -404,11 +404,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
}
/**
- * Returns the delegate source's default output coder.
+ * Returns the delegate source's output coder.
*/
@Override
- public final Coder<T> getDefaultOutputCoder() {
- return sourceDelegate.getDefaultOutputCoder();
+ public final Coder<T> getOutputCoder() {
+ return sourceDelegate.getOutputCoder();
}
public final DecompressingChannelFactory getChannelFactory() {
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 6202c2b..b47edc7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -188,7 +188,7 @@ public class CountingSource {
}
@Override
- public Coder<Long> getDefaultOutputCoder() {
+ public Coder<Long> getOutputCoder() {
return VarLongCoder.of();
}
@@ -364,7 +364,7 @@ public class CountingSource {
public void validate() {}
@Override
- public Coder<Long> getDefaultOutputCoder() {
+ public Coder<Long> getOutputCoder() {
return VarLongCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index a07fca8..6e6750d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -96,7 +96,7 @@ public class Read {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
@@ -164,7 +164,7 @@ public class Read {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index 542d91c..32a7270 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -61,10 +61,16 @@ public abstract class Source<T> implements Serializable, HasDisplayData {
*/
public abstract void validate();
- /**
- * Returns the default {@code Coder} to use for the data read from this source.
- */
- public abstract Coder<T> getDefaultOutputCoder();
+ /** @deprecated Override {@link #getOutputCoder()} instead. */
+ @Deprecated
+ public Coder<T> getDefaultOutputCoder() {
+ throw new UnsupportedOperationException("Source needs to override getOutputCoder()");
+ }
+
+ /** Returns the {@code Coder} to use for the data read from this source. */
+ public Coder<T> getOutputCoder() {
+ return getDefaultOutputCoder();
+ }
/**
* {@inheritDoc}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 29b3e29..1b2e95b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -474,7 +474,7 @@ public class TFRecordIO {
}
@Override
- public Coder<byte[]> getDefaultOutputCoder() {
+ public Coder<byte[]> getOutputCoder() {
return DEFAULT_BYTE_ARRAY_CODER;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 4d9fa77..86c73d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -69,7 +69,7 @@ class TextSource extends FileBasedSource<String> {
}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index cde0b94..e147221 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -212,7 +212,7 @@ public class SourceTestUtils {
List<? extends BoundedSource<T>> sources,
PipelineOptions options)
throws Exception {
- Coder<T> coder = referenceSource.getDefaultOutputCoder();
+ Coder<T> coder = referenceSource.getOutputCoder();
List<T> referenceRecords = readFromSource(referenceSource, options);
List<T> bundleRecords = new ArrayList<>();
for (BoundedSource<T> source : sources) {
@@ -221,7 +221,7 @@ public class SourceTestUtils {
+ source
+ " is not compatible with Coder type for referenceSource "
+ referenceSource,
- source.getDefaultOutputCoder(),
+ source.getOutputCoder(),
equalTo(coder));
List<T> elems = readFromSource(source, options);
bundleRecords.addAll(elems);
@@ -239,7 +239,7 @@ public class SourceTestUtils {
*/
public static <T> void assertUnstartedReaderReadsSameAsItsSource(
BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
- Coder<T> coder = reader.getCurrentSource().getDefaultOutputCoder();
+ Coder<T> coder = reader.getCurrentSource().getOutputCoder();
List<T> expected = readFromUnstartedReader(reader);
List<T> actual = readFromSource(reader.getCurrentSource(), options);
List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);
@@ -415,7 +415,7 @@ public class SourceTestUtils {
source,
primary,
residual);
- Coder<T> coder = primary.getDefaultOutputCoder();
+ Coder<T> coder = primary.getOutputCoder();
List<ReadableStructuralValue<T>> primaryValues =
createStructuralValues(coder, primaryItems);
List<ReadableStructuralValue<T>> currentValues =
@@ -728,8 +728,8 @@ public class SourceTestUtils {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return boundedSource.getDefaultOutputCoder();
+ public Coder<T> getOutputCoder() {
+ return boundedSource.getOutputCoder();
}
private static class UnsplittableReader<T> extends BoundedReader<T> {
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 09e12ef..a28e9b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -411,7 +411,7 @@ public class Create<T> {
public void validate() {}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index fa28e4b..fe6f01f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -638,7 +638,7 @@ public class CompressedSourceTest {
}
@Override
- public Coder<Byte> getDefaultOutputCoder() {
+ public Coder<Byte> getOutputCoder() {
return SerializableCoder.of(Byte.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index c15e667..1bdb915 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -107,7 +107,7 @@ public class FileBasedSourceTest {
public void validate() {}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 25168a3..feda355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -65,7 +65,7 @@ public class OffsetBasedSourceTest {
public void validate() {}
@Override
- public Coder<Integer> getDefaultOutputCoder() {
+ public Coder<Integer> getOutputCoder() {
return BigEndianIntegerCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 74acf18..4277dc3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -171,7 +171,7 @@ public class ReadTest implements Serializable{
public void validate() {}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
}
@@ -207,7 +207,7 @@ public class ReadTest implements Serializable{
public void validate() {}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
index 9fcc3c5..338ea38 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
@@ -248,7 +248,7 @@ public class TestCountingSource
public void validate() {}
@Override
- public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+ public Coder<KV<Integer, Integer>> getOutputCoder() {
return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 6a682ef..6be6772 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -487,12 +487,12 @@ public class CreateTest {
}
@Test
- public void testSourceGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
+ public void testSourceGetOutputCoderReturnsConstructorCoder() throws Exception {
Coder<Integer> coder = VarIntCoder.of();
CreateSource<Integer> source =
CreateSource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
- Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
+ Coder<Integer> defaultCoder = source.getOutputCoder();
assertThat(defaultCoder, equalTo(coder));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
index 1f307b2..508373f 100644
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
@@ -246,7 +246,7 @@ public class AmqpIO {
}
@Override
- public Coder<Message> getDefaultOutputCoder() {
+ public Coder<Message> getOutputCoder() {
return new AmqpMessageCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 32905b7..eacc3e4 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -289,7 +289,7 @@ public class CassandraIO {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return spec.coder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index acc7f2f..5046888 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -484,7 +484,7 @@ public class ElasticsearchIO {
}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 6c118a0..abe559c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -133,7 +133,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
}
@Override
- public Coder<TableRow> getDefaultOutputCoder() {
+ public Coder<TableRow> getOutputCoder() {
return TableRowJsonCoder.of();
}
@@ -184,7 +184,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
for (ResourceId file : files) {
avroSources.add(
- AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder()));
+ AvroSource.from(file.toString()).withParseFn(function, getOutputCoder()));
}
return ImmutableList.copyOf(avroSources);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 0a90dde..c5b0fbf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -893,7 +893,7 @@ public class BigtableIO {
}
@Override
- public Coder<Row> getDefaultOutputCoder() {
+ public Coder<Row> getOutputCoder() {
return ProtoCoder.of(Row.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b7df804..8da6ff4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1164,7 +1164,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
}
@Override
- public Coder<PubsubMessage> getDefaultOutputCoder() {
+ public Coder<PubsubMessage> getOutputCoder() {
return outer.getNeedsAttributes()
? PubsubMessageWithAttributesCoder.of()
: PubsubMessagePayloadOnlyCoder.of();
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 0b4c23f..20ca50a 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -552,7 +552,7 @@ public class HadoopInputFormatIO {
}
@Override
- public Coder<KV<K, V>> getDefaultOutputCoder() {
+ public Coder<KV<K, V>> getOutputCoder() {
return KvCoder.of(keyCoder, valueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 90ede4c..2ba6826 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -457,7 +457,7 @@ public class HBaseIO {
}
@Override
- public Coder<Result> getDefaultOutputCoder() {
+ public Coder<Result> getOutputCoder() {
return HBaseResultCoder.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index 4199b80..d8e462b 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -210,7 +210,7 @@ public class HCatalogIO {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
- public Coder<HCatRecord> getDefaultOutputCoder() {
+ public Coder<HCatRecord> getOutputCoder() {
return (Coder) WritableCoder.of(DefaultHCatRecord.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index f8cba5e..2af0ce9 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -373,7 +373,7 @@ public class JmsIO {
}
@Override
- public Coder<JmsRecord> getDefaultOutputCoder() {
+ public Coder<JmsRecord> getOutputCoder() {
return SerializableCoder.of(JmsRecord.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 026313a..7fb4260 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -844,7 +844,7 @@ public class KafkaIO {
}
@Override
- public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
+ public Coder<KafkaRecord<K, V>> getOutputCoder() {
return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 362792b..144bd80 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -107,7 +107,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
}
@Override
- public Coder<KinesisRecord> getDefaultOutputCoder() {
+ public Coder<KinesisRecord> getOutputCoder() {
return KinesisRecordCoder.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 5b5412c..c612d52 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -440,7 +440,7 @@ public class MongoDbGridFSIO {
}
@Override
- public Coder<ObjectId> getDefaultOutputCoder() {
+ public Coder<ObjectId> getOutputCoder() {
return SerializableCoder.of(ObjectId.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 3b14182..087123a 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -277,7 +277,7 @@ public class MongoDbIO {
}
@Override
- public Coder<Document> getDefaultOutputCoder() {
+ public Coder<Document> getOutputCoder() {
return SerializableCoder.of(Document.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index add5cb5..5aadb80 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -387,7 +387,7 @@ public class MqttIO {
}
@Override
- public Coder<byte[]> getDefaultOutputCoder() {
+ public Coder<byte[]> getOutputCoder() {
return ByteArrayCoder.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
index 7aa42c5..b893d43 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
@@ -85,7 +85,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return JAXBCoder.of(spec.getRecordClass());
}