You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/05 20:35:21 UTC
[1/6] beam git commit: This closes #2829
Repository: beam
Updated Branches:
refs/heads/master 86a94998e -> 0490d6b36
This closes #2829
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0490d6b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0490d6b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0490d6b3
Branch: refs/heads/master
Commit: 0490d6b36cfbfe11f3c12873164bae270b4b5232
Parents: 86a9499 c7f3e3c
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 13:30:51 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700
----------------------------------------------------------------------
.../apex/translation/utils/ApexStreamTuple.java | 5 +-
.../UnboundedReadFromBoundedSource.java | 4 +-
.../runners/core/construction/CodersTest.java | 14 +--
.../core/construction/PCollectionsTest.java | 3 +-
.../core/ElementAndRestrictionCoder.java | 17 ++-
.../beam/runners/core/KeyedWorkItemCoder.java | 4 +-
.../beam/runners/core/TimerInternals.java | 6 +-
.../direct/CloningBundleFactoryTest.java | 10 +-
.../beam/runners/direct/DirectRunnerTest.java | 5 +-
.../UnboundedReadEvaluatorFactoryTest.java | 4 +-
.../streaming/SingletonKeyedWorkItemCoder.java | 4 +-
.../runners/dataflow/BatchViewOverrides.java | 8 +-
.../runners/dataflow/internal/IsmFormat.java | 40 ++++--
.../runners/dataflow/util/RandomAccessData.java | 4 +-
.../org/apache/beam/sdk/coders/AtomicCoder.java | 86 +++++++++++++
.../org/apache/beam/sdk/coders/AvroCoder.java | 19 +++
.../apache/beam/sdk/coders/BigDecimalCoder.java | 2 +-
.../beam/sdk/coders/BigEndianIntegerCoder.java | 2 +-
.../beam/sdk/coders/BigEndianLongCoder.java | 4 +-
.../apache/beam/sdk/coders/BigIntegerCoder.java | 4 +-
.../org/apache/beam/sdk/coders/BitSetCoder.java | 4 +-
.../apache/beam/sdk/coders/ByteArrayCoder.java | 6 +-
.../org/apache/beam/sdk/coders/ByteCoder.java | 2 +-
.../java/org/apache/beam/sdk/coders/Coder.java | 121 ++++++++++++++++---
.../apache/beam/sdk/coders/CoderFactories.java | 9 +-
.../org/apache/beam/sdk/coders/CustomCoder.java | 50 +++++++-
.../org/apache/beam/sdk/coders/DoubleCoder.java | 2 +-
.../apache/beam/sdk/coders/DurationCoder.java | 2 +-
.../apache/beam/sdk/coders/InstantCoder.java | 2 +-
.../org/apache/beam/sdk/coders/KvCoder.java | 4 +-
.../org/apache/beam/sdk/coders/ListCoder.java | 3 +-
.../org/apache/beam/sdk/coders/MapCoder.java | 2 +-
.../apache/beam/sdk/coders/NullableCoder.java | 6 +-
.../apache/beam/sdk/coders/StringUtf8Coder.java | 2 +-
.../apache/beam/sdk/coders/StructuredCoder.java | 67 ++--------
.../beam/sdk/coders/TextualIntegerCoder.java | 2 +-
.../org/apache/beam/sdk/coders/VarIntCoder.java | 2 +-
.../org/apache/beam/sdk/coders/VoidCoder.java | 4 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 4 +-
.../sdk/transforms/ApproximateQuantiles.java | 26 +++-
.../org/apache/beam/sdk/transforms/Combine.java | 12 +-
.../apache/beam/sdk/transforms/CombineFns.java | 4 +-
.../org/apache/beam/sdk/transforms/Count.java | 4 +-
.../org/apache/beam/sdk/transforms/Mean.java | 7 +-
.../org/apache/beam/sdk/transforms/Top.java | 23 +++-
.../beam/sdk/transforms/join/CoGbkResult.java | 2 +-
.../beam/sdk/transforms/join/UnionCoder.java | 7 +-
.../beam/sdk/transforms/windowing/PaneInfo.java | 10 +-
.../org/apache/beam/sdk/util/BitSetCoder.java | 9 +-
.../org/apache/beam/sdk/util/WindowedValue.java | 7 +-
.../beam/sdk/values/TimestampedValue.java | 13 +-
.../beam/sdk/values/ValueInSingleWindow.java | 4 +-
.../beam/sdk/values/ValueWithRecordId.java | 4 +-
.../beam/sdk/coders/CoderRegistryTest.java | 4 +-
.../beam/sdk/coders/DelegateCoderTest.java | 25 ----
.../beam/sdk/coders/NullableCoderTest.java | 2 +-
.../beam/sdk/testing/CoderPropertiesTest.java | 37 +++++-
.../apache/beam/sdk/testing/PAssertTest.java | 4 +-
.../sdk/testing/SerializableMatchersTest.java | 4 +-
.../beam/sdk/testing/WindowSupplierTest.java | 6 +-
.../beam/sdk/transforms/CombineFnsTest.java | 4 +-
.../apache/beam/sdk/transforms/CombineTest.java | 6 +-
.../apache/beam/sdk/transforms/CreateTest.java | 6 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 4 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 6 +-
.../apache/beam/sdk/transforms/ViewTest.java | 4 +-
.../transforms/reflect/DoFnInvokersTest.java | 6 +-
.../apache/beam/sdk/util/CoderUtilsTest.java | 4 +-
.../beam/sdk/util/SerializableUtilsTest.java | 4 +-
.../extensions/protobuf/ByteStringCoder.java | 4 +-
.../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 10 +-
.../io/gcp/bigquery/TableDestinationCoder.java | 4 +-
.../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 4 +-
.../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 4 +-
.../io/gcp/bigquery/WriteBundlesToFiles.java | 11 +-
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 4 +-
.../io/gcp/pubsub/PubsubUnboundedSource.java | 12 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 +-
.../beam/sdk/io/hadoop/WritableCoder.java | 18 +++
.../beam/sdk/io/hbase/HBaseMutationCoder.java | 4 +-
.../beam/sdk/io/hbase/HBaseResultCoder.java | 4 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 6 +-
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 4 +-
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 4 +-
.../org/apache/beam/sdk/io/xml/JAXBCoder.java | 18 +++
.../apache/beam/sdk/io/xml/JAXBCoderTest.java | 4 +-
86 files changed, 612 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
[3/6] beam git commit: Reparent many Coders to Atomic or
StructuredCoder
Posted by tg...@apache.org.
Reparent many Coders to Atomic or StructuredCoder
These coders do not take configuration, or take configuration only in
terms of other Coders, and are appropriate to reparent.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/987c2cbc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/987c2cbc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/987c2cbc
Branch: refs/heads/master
Commit: 987c2cbca433f3e0ff12a637f2b0474e772c6beb
Parents: 63258c6
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:13:05 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700
----------------------------------------------------------------------
.../apex/translation/utils/ApexStreamTuple.java | 5 ++-
.../UnboundedReadFromBoundedSource.java | 4 +-
.../runners/core/construction/CodersTest.java | 14 +------
.../core/construction/PCollectionsTest.java | 3 +-
.../core/ElementAndRestrictionCoder.java | 17 ++++++++-
.../beam/runners/core/KeyedWorkItemCoder.java | 4 +-
.../beam/runners/core/TimerInternals.java | 6 +--
.../direct/CloningBundleFactoryTest.java | 10 ++---
.../beam/runners/direct/DirectRunnerTest.java | 5 +--
.../UnboundedReadEvaluatorFactoryTest.java | 4 +-
.../streaming/SingletonKeyedWorkItemCoder.java | 4 +-
.../runners/dataflow/BatchViewOverrides.java | 8 ++--
.../runners/dataflow/internal/IsmFormat.java | 40 ++++++++++++++++----
.../runners/dataflow/util/RandomAccessData.java | 4 +-
.../org/apache/beam/sdk/coders/AvroCoder.java | 19 ++++++++++
.../apache/beam/sdk/coders/BigDecimalCoder.java | 2 +-
.../beam/sdk/coders/BigEndianIntegerCoder.java | 2 +-
.../beam/sdk/coders/BigEndianLongCoder.java | 4 +-
.../apache/beam/sdk/coders/BigIntegerCoder.java | 4 +-
.../org/apache/beam/sdk/coders/BitSetCoder.java | 4 +-
.../apache/beam/sdk/coders/ByteArrayCoder.java | 6 +--
.../org/apache/beam/sdk/coders/ByteCoder.java | 2 +-
.../apache/beam/sdk/coders/CoderFactories.java | 9 +++--
.../org/apache/beam/sdk/coders/DoubleCoder.java | 2 +-
.../apache/beam/sdk/coders/DurationCoder.java | 2 +-
.../apache/beam/sdk/coders/InstantCoder.java | 2 +-
.../org/apache/beam/sdk/coders/KvCoder.java | 4 +-
.../org/apache/beam/sdk/coders/ListCoder.java | 3 +-
.../org/apache/beam/sdk/coders/MapCoder.java | 2 +-
.../apache/beam/sdk/coders/NullableCoder.java | 6 +--
.../apache/beam/sdk/coders/StringUtf8Coder.java | 2 +-
.../beam/sdk/coders/TextualIntegerCoder.java | 2 +-
.../org/apache/beam/sdk/coders/VarIntCoder.java | 2 +-
.../org/apache/beam/sdk/coders/VoidCoder.java | 4 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 4 +-
.../sdk/transforms/ApproximateQuantiles.java | 26 ++++++++++---
.../org/apache/beam/sdk/transforms/Combine.java | 12 ++++--
.../apache/beam/sdk/transforms/CombineFns.java | 4 +-
.../org/apache/beam/sdk/transforms/Count.java | 4 +-
.../org/apache/beam/sdk/transforms/Mean.java | 7 ++--
.../org/apache/beam/sdk/transforms/Top.java | 23 ++++++++++-
.../beam/sdk/transforms/join/CoGbkResult.java | 2 +-
.../beam/sdk/transforms/join/UnionCoder.java | 7 ++--
.../beam/sdk/transforms/windowing/PaneInfo.java | 10 ++++-
.../org/apache/beam/sdk/util/BitSetCoder.java | 9 +++--
.../org/apache/beam/sdk/util/WindowedValue.java | 7 ++--
.../beam/sdk/values/TimestampedValue.java | 13 +++++--
.../beam/sdk/values/ValueInSingleWindow.java | 4 +-
.../beam/sdk/values/ValueWithRecordId.java | 4 +-
.../beam/sdk/coders/CoderRegistryTest.java | 4 +-
.../beam/sdk/coders/DelegateCoderTest.java | 25 ------------
.../beam/sdk/coders/NullableCoderTest.java | 2 +-
.../beam/sdk/testing/CoderPropertiesTest.java | 37 ++++++++++++++++--
.../apache/beam/sdk/testing/PAssertTest.java | 4 +-
.../sdk/testing/SerializableMatchersTest.java | 4 +-
.../beam/sdk/testing/WindowSupplierTest.java | 6 +--
.../beam/sdk/transforms/CombineFnsTest.java | 4 +-
.../apache/beam/sdk/transforms/CombineTest.java | 6 +--
.../apache/beam/sdk/transforms/CreateTest.java | 6 +--
.../beam/sdk/transforms/GroupByKeyTest.java | 4 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 6 +--
.../apache/beam/sdk/transforms/ViewTest.java | 4 +-
.../transforms/reflect/DoFnInvokersTest.java | 6 +--
.../apache/beam/sdk/util/CoderUtilsTest.java | 4 +-
.../beam/sdk/util/SerializableUtilsTest.java | 4 +-
.../extensions/protobuf/ByteStringCoder.java | 4 +-
.../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 10 ++---
.../io/gcp/bigquery/TableDestinationCoder.java | 4 +-
.../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 4 +-
.../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 4 +-
.../io/gcp/bigquery/WriteBundlesToFiles.java | 11 +++++-
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 4 +-
.../io/gcp/pubsub/PubsubUnboundedSource.java | 12 ++++--
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 +-
.../beam/sdk/io/hadoop/WritableCoder.java | 18 +++++++++
.../beam/sdk/io/hbase/HBaseMutationCoder.java | 4 +-
.../beam/sdk/io/hbase/HBaseResultCoder.java | 4 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 6 +--
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 4 +-
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 4 +-
.../org/apache/beam/sdk/io/xml/JAXBCoder.java | 18 +++++++++
.../apache/beam/sdk/io/xml/JAXBCoderTest.java | 4 +-
82 files changed, 369 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
index 4ce351b..4aa6ee8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -31,7 +31,7 @@ import java.util.Objects;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
/**
* The common interface for all objects transmitted through streams.
@@ -149,7 +149,7 @@ public interface ApexStreamTuple<T> {
/**
* Coder for {@link ApexStreamTuple}.
*/
- class ApexStreamTupleCoder<T> extends CustomCoder<ApexStreamTuple<T>> {
+ class ApexStreamTupleCoder<T> extends StructuredCoder<ApexStreamTuple<T>> {
private static final long serialVersionUID = 1L;
final Coder<T> valueCoder;
@@ -194,6 +194,7 @@ public interface ApexStreamTuple<T> {
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
+ this,
this.getClass().getSimpleName() + " requires a deterministic valueCoder",
valueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 0ea13b8..1424b8b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -35,10 +35,10 @@ import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.Read;
@@ -203,7 +203,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@VisibleForTesting
- static class CheckpointCoder<T> extends CustomCoder<Checkpoint<T>> {
+ static class CheckpointCoder<T> extends StructuredCoder<Checkpoint<T>> {
// The coder for a list of residual elements and their timestamps
private final Coder<List<TimestampedValue<T>>> elemsCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index 32a78fa..765723c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -30,11 +30,11 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
@@ -149,7 +149,7 @@ public class CodersTest {
static class Record implements Serializable {}
- private static class RecordCoder extends CustomCoder<Record> {
+ private static class RecordCoder extends AtomicCoder<Record> {
@Override
public void encode(Record value, OutputStream outStream, Context context)
throws CoderException, IOException {}
@@ -159,16 +159,6 @@ public class CodersTest {
throws CoderException, IOException {
return new Record();
}
-
- @Override
- public boolean equals(Object other) {
- return other != null && getClass().equals(other.getClass());
- }
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
index c177c58..2c45cbd 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -158,7 +159,7 @@ public class PCollectionsTest {
@Override
public Coder<BoundedWindow> windowCoder() {
- return new CustomCoder<BoundedWindow>() {
+ return new AtomicCoder<BoundedWindow>() {
@Override public void verifyDeterministic() {}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 64c1e14..83c4e62 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -17,18 +17,20 @@
*/
package org.apache.beam.runners.core;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
/** A {@link Coder} for {@link ElementAndRestriction}. */
@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
public class ElementAndRestrictionCoder<ElementT, RestrictionT>
- extends CustomCoder<ElementAndRestriction<ElementT, RestrictionT>> {
+ extends StructuredCoder<ElementAndRestriction<ElementT, RestrictionT>> {
private final Coder<ElementT> elementCoder;
private final Coder<RestrictionT> restrictionCoder;
@@ -65,6 +67,17 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
return ElementAndRestriction.of(key, value);
}
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return ImmutableList.of(elementCoder, restrictionCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ elementCoder.verifyDeterministic();
+ restrictionCoder.verifyDeterministic();
+ }
+
public Coder<ElementT> getElementCoder() {
return elementCoder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index fddf7fa..e1872b5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -26,8 +26,8 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
/**
* A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
*/
-public class KeyedWorkItemCoder<K, ElemT> extends CustomCoder<KeyedWorkItem<K, ElemT>> {
+public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<K, ElemT>> {
/**
* Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
* coder.
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 21fe430..888c11f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -27,9 +27,9 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -224,7 +224,7 @@ public interface TimerInternals {
/**
* A {@link Coder} for {@link TimerData}.
*/
- class TimerDataCoder extends CustomCoder<TimerData> {
+ class TimerDataCoder extends StructuredCoder<TimerData> {
private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
private static final InstantCoder INSTANT_CODER = InstantCoder.of();
private final Coder<? extends BoundedWindow> windowCoder;
@@ -266,7 +266,7 @@ public interface TimerInternals {
@Override
public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("window coder must be deterministic", windowCoder);
+ verifyDeterministic(this, "window coder must be deterministic", windowCoder);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 7d037d1..33d171e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -173,7 +173,7 @@ public class CloningBundleFactoryTest {
}
static class Record {}
- static class RecordNoEncodeCoder extends CustomCoder<Record> {
+ static class RecordNoEncodeCoder extends AtomicCoder<Record> {
@Override
public void encode(
@@ -192,7 +192,7 @@ public class CloningBundleFactoryTest {
}
}
- static class RecordNoDecodeCoder extends CustomCoder<Record> {
+ static class RecordNoDecodeCoder extends AtomicCoder<Record> {
@Override
public void encode(
Record value,
@@ -208,7 +208,7 @@ public class CloningBundleFactoryTest {
}
}
- private static class RecordStructuralValueCoder extends CustomCoder<Record> {
+ private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
@Override
public void encode(
Record value,
@@ -240,7 +240,7 @@ public class CloningBundleFactoryTest {
}
private static class RecordNotConsistentWithEqualsStructuralValueCoder
- extends CustomCoder<Record> {
+ extends AtomicCoder<Record> {
@Override
public void encode(
Record value,
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 428c6fc..0fe9585 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -44,9 +44,9 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -523,8 +523,7 @@ public class DirectRunnerTest implements Serializable {
p.run();
}
- private static class LongNoDecodeCoder extends CustomCoder<Long> {
-
+ private static class LongNoDecodeCoder extends AtomicCoder<Long> {
@Override
public void encode(
Long value, OutputStream outStream, Context context) throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index ceb078b..b9ba7f4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -46,10 +46,10 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
@@ -586,7 +586,7 @@ public class UnboundedReadEvaluatorFactoryTest {
return finalized;
}
- public static class Coder extends CustomCoder<TestCheckpointMark> {
+ public static class Coder extends AtomicCoder<TestCheckpointMark> {
@Override
public void encode(
TestCheckpointMark value,
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index c73700f..f218693 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -26,7 +26,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.util.WindowedValue;
* Singleton keyed work item coder.
*/
public class SingletonKeyedWorkItemCoder<K, ElemT>
- extends CustomCoder<SingletonKeyedWorkItem<K, ElemT>> {
+ extends StructuredCoder<SingletonKeyedWorkItem<K, ElemT>> {
/**
* Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
* coder.
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ef2bfed..ecd0365 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -50,11 +50,11 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -1335,7 +1335,7 @@ class BatchViewOverrides {
* A {@link Coder} for {@link TransformedMap}s.
*/
static class TransformedMapCoder<K, V1, V2>
- extends CustomCoder<TransformedMap<K, V1, V2>> {
+ extends StructuredCoder<TransformedMap<K, V1, V2>> {
private final Coder<Function<V1, V2>> transformCoder;
private final Coder<Map<K, V1>> originalMapCoder;
@@ -1373,8 +1373,8 @@ class BatchViewOverrides {
@Override
public void verifyDeterministic()
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
- verifyDeterministic("Expected transform coder to be deterministic.", transformCoder);
- verifyDeterministic("Expected map coder to be deterministic.", originalMapCoder);
+ verifyDeterministic(this, "Expected transform coder to be deterministic.", transformCoder);
+ verifyDeterministic(this, "Expected map coder to be deterministic.", originalMapCoder);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index fbfe49a..aed514a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -32,14 +32,17 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.VarInt;
@@ -356,8 +359,9 @@ public class IsmFormat {
@Override
public void verifyDeterministic() throws Coder.NonDeterministicException {
- verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
- verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
+ verifyDeterministic(
+ this, "Key component coders expected to be deterministic.", keyComponentCoders);
+ verifyDeterministic(this, "Value coder expected to be deterministic.", valueCoder);
}
@Override
@@ -393,6 +397,28 @@ public class IsmFormat {
}
return super.structuralValue(record);
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof IsmRecordCoder)) {
+ return false;
+ }
+ IsmRecordCoder<?> that = (IsmRecordCoder<?>) other;
+ return Objects.equals(this.numberOfShardKeyCoders, that.numberOfShardKeyCoders)
+ && Objects.equals(
+ this.numberOfMetadataShardKeyCoders, that.numberOfMetadataShardKeyCoders)
+ && Objects.equals(this.keyComponentCoders, that.keyComponentCoders)
+ && Objects.equals(this.valueCoder, that.valueCoder);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ numberOfShardKeyCoders, numberOfMetadataShardKeyCoders, keyComponentCoders, valueCoder);
+ }
}
/**
@@ -450,7 +476,7 @@ public class IsmFormat {
* A coder for metadata key component. Can be used to wrap key component coder allowing for
* the metadata key component to be used as a place holder instead of an actual key.
*/
- public static class MetadataKeyCoder<K> extends CustomCoder<K> {
+ public static class MetadataKeyCoder<K> extends StructuredCoder<K> {
public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
checkNotNull(keyCoder);
return new MetadataKeyCoder<>(keyCoder);
@@ -497,7 +523,7 @@ public class IsmFormat {
@Override
public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("Expected key coder to be deterministic", keyCoder);
+ verifyDeterministic(this, "Expected key coder to be deterministic", keyCoder);
}
}
@@ -584,7 +610,7 @@ public class IsmFormat {
* <li>indexOffset (variable length long encoding)</li>
* </ul>
*/
- public static class IsmShardCoder extends CustomCoder<IsmShard> {
+ public static class IsmShardCoder extends AtomicCoder<IsmShard> {
private static final IsmShardCoder INSTANCE = new IsmShardCoder();
/** Returns an IsmShardCoder. */
@@ -649,7 +675,7 @@ public class IsmFormat {
}
/** A {@link Coder} for {@link KeyPrefix}. */
- public static final class KeyPrefixCoder extends CustomCoder<KeyPrefix> {
+ public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
public static KeyPrefixCoder of() {
@@ -721,7 +747,7 @@ public class IsmFormat {
}
/** A {@link Coder} for {@link Footer}. */
- public static final class FooterCoder extends CustomCoder<Footer> {
+ public static final class FooterCoder extends AtomicCoder<Footer> {
private static final FooterCoder INSTANCE = new FooterCoder();
public static FooterCoder of() {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index 66548e2..4e94515 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -30,10 +30,10 @@ import java.io.OutputStream;
import java.util.Arrays;
import java.util.Comparator;
import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.VarInt;
/**
@@ -55,7 +55,7 @@ public class RandomAccessData {
*
* <p>This coder does not support encoding positive infinity.
*/
- public static class RandomAccessDataCoder extends CustomCoder<RandomAccessData> {
+ public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
public static RandomAccessDataCoder of() {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 1e01f1a..2aa2b44 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -29,6 +29,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -681,4 +682,22 @@ public class AvroCoder<T> extends CustomCoder<T> {
throw new IllegalArgumentException("Unable to get field " + name + " from " + originalClazz);
}
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof AvroCoder)) {
+ return false;
+ }
+ AvroCoder<?> that = (AvroCoder<?>) other;
+ return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
+ && Objects.equals(this.typeDescriptor, that.typeDescriptor);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaSupplier.get(), typeDescriptor);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index d628203..aadf085 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -32,7 +32,7 @@ import java.math.MathContext;
* {@link BigInteger}, when scaled (with unlimited precision, aka {@link MathContext#UNLIMITED}),
* yields the expected {@link BigDecimal}.
*/
-public class BigDecimalCoder extends CustomCoder<BigDecimal> {
+public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
public static BigDecimalCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index 81c5e94..c3c7a96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link BigEndianIntegerCoder} encodes {@link Integer Integers} in 4 bytes, big-endian.
*/
-public class BigEndianIntegerCoder extends CustomCoder<Integer> {
+public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
public static BigEndianIntegerCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 173e910..5ef4878 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -30,9 +29,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link BigEndianLongCoder} encodes {@link Long}s in 8 bytes, big-endian.
*/
-public class BigEndianLongCoder extends CustomCoder<Long> {
+public class BigEndianLongCoder extends AtomicCoder<Long> {
- @JsonCreator
public static BigEndianLongCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index a739da7..6d14d17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -28,7 +28,7 @@ import java.math.BigInteger;
* A {@link BigIntegerCoder} encodes a {@link BigInteger} as a byte array containing the big endian
* two's-complement representation, encoded via {@link ByteArrayCoder}.
*/
-public class BigIntegerCoder extends CustomCoder<BigInteger> {
+public class BigIntegerCoder extends AtomicCoder<BigInteger> {
public static BigIntegerCoder of() {
return INSTANCE;
@@ -55,7 +55,7 @@ public class BigIntegerCoder extends CustomCoder<BigInteger> {
}
@Override
- public void verifyDeterministic() throws NonDeterministicException {
+ public void verifyDeterministic() {
BYTE_ARRAY_CODER.verifyDeterministic();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
index 5a4db24..f49776b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -25,7 +25,7 @@ import java.util.BitSet;
/**
* Coder for {@link BitSet}.
*/
-public class BitSetCoder extends CustomCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
private static final BitSetCoder INSTANCE = new BitSetCoder();
private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
@@ -53,7 +53,7 @@ public class BitSetCoder extends CustomCoder<BitSet> {
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
- "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+ this, "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index cba8d49..28cb627 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
@@ -40,9 +39,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* encoded via a {@link VarIntCoder}.</li>
* </ul>
*/
-public class ByteArrayCoder extends StructuredCoder<byte[]> {
+public class ByteArrayCoder extends AtomicCoder<byte[]> {
- @JsonCreator
public static ByteArrayCoder of() {
return INSTANCE;
}
@@ -117,7 +115,7 @@ public class ByteArrayCoder extends StructuredCoder<byte[]> {
}
@Override
- public void verifyDeterministic() throws NonDeterministicException {}
+ public void verifyDeterministic() {}
/**
* {@inheritDoc}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 1a1be64..6e4318e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization.
*/
-public class ByteCoder extends CustomCoder<Byte> {
+public class ByteCoder extends AtomicCoder<Byte> {
public static ByteCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
index 2a1d792..4f05c95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
@@ -239,11 +239,12 @@ public final class CoderFactories {
}
/**
- * If {@code coderType} is a subclass of {@link Coder} for a specific
- * type {@code T}, returns {@code T.class}. Otherwise, raises IllegalArgumentException.
+ * If {@code coderType} is a subclass of {@link Coder} for a specific type {@code T}, returns
+ * {@code T.class}. Otherwise, raises IllegalArgumentException.
*/
- private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<? extends Coder> coderType) {
- TypeDescriptor<?> coderSupertype = coderType.getSupertype(Coder.class);
+ private <T, CoderT extends Coder> TypeDescriptor<T> getCodedType(
+ TypeDescriptor<CoderT> coderType) {
+ TypeDescriptor<? super CoderT> coderSupertype = coderType.getSupertype(Coder.class);
ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType();
@SuppressWarnings("unchecked")
TypeDescriptor<T> token =
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 06e7dae..12bc5e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
*/
-public class DoubleCoder extends CustomCoder<Double> {
+public class DoubleCoder extends AtomicCoder<Double> {
public static DoubleCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 10a83ef..7b49d1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -29,7 +29,7 @@ import org.joda.time.ReadableDuration;
* A {@link Coder} that encodes a joda {@link Duration} as a {@link Long} using the format of
* {@link VarLongCoder}.
*/
-public class DurationCoder extends CustomCoder<ReadableDuration> {
+public class DurationCoder extends AtomicCoder<ReadableDuration> {
public static DurationCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index cfd1979..56ed12b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
* A {@link Coder} for joda {@link Instant} that encodes it as a big endian {@link Long}
* shifted such that lexicographic ordering of the bytes corresponds to chronological order.
*/
-public class InstantCoder extends CustomCoder<Instant> {
+public class InstantCoder extends AtomicCoder<Instant> {
public static InstantCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 8a689f7..35b7449 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -89,8 +89,8 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
@Override
public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("Key coder must be deterministic", getKeyCoder());
- verifyDeterministic("Value coder must be deterministic", getValueCoder());
+ verifyDeterministic(this, "Key coder must be deterministic", getKeyCoder());
+ verifyDeterministic(this, "Value coder must be deterministic", getValueCoder());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 32467d2..70bbf93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -58,8 +58,7 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
*/
@Override
public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(
- "ListCoder.elemCoder must be deterministic", getElemCoder());
+ verifyDeterministic(this, "ListCoder.elemCoder must be deterministic", getElemCoder());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index e2c4e28..da2bf50 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.values.TypeParameter;
* @param <K> the type of the keys of the KVs being transcoded
* @param <V> the type of the values of the KVs being transcoded
*/
-public class MapCoder<K, V> extends CustomCoder<Map<K, V>> {
+public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
/**
* Produces a MapCoder with the given keyCoder and valueCoder.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 747d91c..d1eea9a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*
* @param <T> the type of the values being transcoded
*/
-public class NullableCoder<T> extends CustomCoder<T> {
+public class NullableCoder<T> extends StructuredCoder<T> {
public static <T> NullableCoder<T> of(Coder<T> valueCoder) {
if (valueCoder instanceof NullableCoder) {
return (NullableCoder<T>) valueCoder;
@@ -93,11 +93,11 @@ public class NullableCoder<T> extends CustomCoder<T> {
/**
* {@code NullableCoder} is deterministic if the nested {@code Coder} is.
*
- * {@inheritDoc}
+ * <p>{@inheritDoc}
*/
@Override
public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("Value coder must be deterministic", valueCoder);
+ verifyDeterministic(this, "Value coder must be deterministic", valueCoder);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index f0a0969..42931ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* If in a nested context, prefixes the string with an integer length field,
* encoded via a {@link VarIntCoder}.
*/
-public class StringUtf8Coder extends CustomCoder<String> {
+public class StringUtf8Coder extends AtomicCoder<String> {
public static StringUtf8Coder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 91a46ea..9743c4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
* their textual, decimal, representation.
*/
-public class TextualIntegerCoder extends CustomCoder<Integer> {
+public class TextualIntegerCoder extends AtomicCoder<Integer> {
public static TextualIntegerCoder of() {
return new TextualIntegerCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index fcc0335..30f9c09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
* integers that are known to often be large or negative.
*/
-public class VarIntCoder extends CustomCoder<Integer> {
+public class VarIntCoder extends AtomicCoder<Integer> {
public static VarIntCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index a65fa5e..829bd20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -25,9 +24,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
*/
-public class VoidCoder extends CustomCoder<Void> {
+public class VoidCoder extends AtomicCoder<Void> {
- @JsonCreator
public static VoidCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 0daf5dc..20fab9b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -43,8 +43,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
@@ -938,7 +938,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* A coder for {@link FileResult} objects.
*/
- public static final class FileResultCoder extends CustomCoder<FileResult> {
+ public static final class FileResultCoder extends AtomicCoder<FileResult> {
private static final FileResultCoder INSTANCE = new FileResultCoder();
private final NullableCoder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 5432f09..b05f223 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.PriorityQueue;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -762,13 +763,28 @@ public class ApproximateQuantiles {
}
@Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof QuantileStateCoder)) {
+ return false;
+ }
+ QuantileStateCoder<?, ?> that = (QuantileStateCoder<?, ?>) other;
+ return Objects.equals(this.elementCoder, that.elementCoder)
+ && Objects.equals(this.compareFn, that.compareFn);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(elementCoder, compareFn);
+ }
+
+ @Override
public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(this, "QuantileState.ElementCoder must be deterministic", elementCoder);
verifyDeterministic(
- "QuantileState.ElementCoder must be deterministic",
- elementCoder);
- verifyDeterministic(
- "QuantileState.ElementListCoder must be deterministic",
- elementListCoder);
+ this, "QuantileState.ElementListCoder must be deterministic", elementListCoder);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 666db3b..b9cdbd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -35,10 +36,10 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.DelegateCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -522,7 +523,7 @@ public class Combine {
/**
* A {@link Coder} for a {@link Holder}.
*/
- private static class HolderCoder<V> extends CustomCoder<Holder<V>> {
+ private static class HolderCoder<V> extends StructuredCoder<Holder<V>> {
private Coder<V> valueCoder;
@@ -552,6 +553,11 @@ public class Combine {
}
@Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.singletonList(valueCoder);
+ }
+
+ @Override
public void verifyDeterministic() throws NonDeterministicException {
valueCoder.verifyDeterministic();
}
@@ -1954,7 +1960,7 @@ public class Combine {
}
private static class InputOrAccumCoder<InputT, AccumT>
- extends CustomCoder<InputOrAccum<InputT, AccumT>> {
+ extends StructuredCoder<InputOrAccum<InputT, AccumT>> {
private final Coder<InputT> inputCoder;
private final Coder<AccumT> accumCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index cc02dcf..d4c97bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
@@ -524,7 +524,7 @@ public class CombineFns {
}
}
- private static class ComposedAccumulatorCoder extends CustomCoder<Object[]> {
+ private static class ComposedAccumulatorCoder extends StructuredCoder<Object[]> {
private List<Coder<Object>> coders;
private int codersCount;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 78a6cd1..753e14c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -23,10 +23,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
import java.util.Iterator;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.VarInt;
@@ -167,7 +167,7 @@ public class Count {
@Override
public Coder<long[]> getAccumulatorCoder(CoderRegistry registry,
Coder<T> inputCoder) {
- return new CustomCoder<long[]>() {
+ return new AtomicCoder<long[]>() {
@Override
public void encode(long[] value, OutputStream outStream, Context context)
throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a6808cf..a309954 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -22,11 +22,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
@@ -180,9 +180,8 @@ public class Mean {
}
}
- static class CountSumCoder<NumT extends Number>
- extends CustomCoder<CountSum<NumT>> {
- private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
+ static class CountSumCoder<NumT extends Number> extends AtomicCoder<CountSum<NumT>> {
+ private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index e42c0b2..9d5db74 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
import java.util.PriorityQueue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -551,8 +552,7 @@ public class Top {
@Override
public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(
- "HeapCoder requires a deterministic list coder", listCoder);
+ verifyDeterministic(this, "HeapCoder requires a deterministic list coder", listCoder);
}
@Override
@@ -568,5 +568,24 @@ public class Top {
throws Exception {
listCoder.registerByteSizeObserver(value.asList(), observer, context);
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof BoundedHeapCoder)) {
+ return false;
+ }
+ BoundedHeapCoder<?, ?> that = (BoundedHeapCoder<?, ?>) other;
+ return Objects.equals(this.compareFn, that.compareFn)
+ && Objects.equals(this.listCoder, that.listCoder)
+ && this.maximumSize == that.maximumSize;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(compareFn, listCoder, maximumSize);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 02e1185..e9a3571 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -300,7 +300,7 @@ public class CoGbkResult {
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
- "CoGbkResult requires the union coder to be deterministic", unionCoder);
+ this, "CoGbkResult requires the union coder to be deterministic", unionCoder);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index f411cd1..4a2a286 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -23,14 +23,14 @@ import java.io.OutputStream;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
/**
* A UnionCoder encodes RawUnionValues.
*/
-public class UnionCoder extends CustomCoder<RawUnionValue> {
+public class UnionCoder extends StructuredCoder<RawUnionValue> {
// TODO: Think about how to integrate this with a schema object (i.e.
// a tuple of tuple tags).
/**
@@ -134,7 +134,6 @@ public class UnionCoder extends CustomCoder<RawUnionValue> {
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
- "UnionCoder is only deterministic if all element coders are",
- elementCoders);
+ this, "UnionCoder is only deterministic if all element coders are", elementCoders);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index faf3ca9..79ce2f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -26,9 +26,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -307,7 +307,7 @@ public final class PaneInfo {
/**
* A Coder for encoding PaneInfo instances.
*/
- public static class PaneInfoCoder extends CustomCoder<PaneInfo> {
+ public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
private enum Encoding {
FIRST,
ONE_INDEX,
@@ -340,6 +340,12 @@ public final class PaneInfo {
public static final PaneInfoCoder INSTANCE = new PaneInfoCoder();
+ public static PaneInfoCoder of() {
+ return INSTANCE;
+ }
+
+ private PaneInfoCoder() {}
+
@Override
public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index b0e9b5c..b646bf6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -21,15 +21,17 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.BitSet;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
/**
* Coder for the BitSet used to track child-trigger finished states.
+ *
+ * @deprecated use {@link org.apache.beam.sdk.coders.BitSetCoder} instead
*/
@Deprecated
-public class BitSetCoder extends CustomCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
private static final BitSetCoder INSTANCE = new BitSetCoder();
private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
@@ -54,8 +56,7 @@ public class BitSetCoder extends CustomCoder<BitSet> {
@Override
public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic(
- "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+ BYTE_ARRAY_CODER.verifyDeterministic();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 13e499d..1e72550 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -663,11 +663,9 @@ public abstract class WindowedValue<T> {
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
- "FullWindowedValueCoder requires a deterministic valueCoder",
- valueCoder);
+ this, "FullWindowedValueCoder requires a deterministic valueCoder", valueCoder);
verifyDeterministic(
- "FullWindowedValueCoder requires a deterministic windowCoder",
- windowCoder);
+ this, "FullWindowedValueCoder requires a deterministic windowCoder", windowCoder);
}
@Override
@@ -728,6 +726,7 @@ public abstract class WindowedValue<T> {
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
+ this,
"ValueOnlyWindowedValueCoder requires a deterministic valueCoder",
valueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index cde9a40..a9f3929 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -23,11 +23,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
@@ -85,7 +86,7 @@ public class TimestampedValue<V> {
/////////////////////////////////////////////////////////////////////////////
/** A {@link Coder} for {@link TimestampedValue}. */
- public static class TimestampedValueCoder<T> extends CustomCoder<TimestampedValue<T>> {
+ public static class TimestampedValueCoder<T> extends StructuredCoder<TimestampedValue<T>> {
private final Coder<T> valueCoder;
@@ -119,8 +120,7 @@ public class TimestampedValue<V> {
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
- "TimestampedValueCoder requires a deterministic valueCoder",
- valueCoder);
+ this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder);
}
@Override
@@ -141,6 +141,11 @@ public class TimestampedValue<V> {
return new TypeDescriptor<TimestampedValue<T>>() {}.where(
new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
}
+
+ @Override
+ public List<? extends Coder<?>> getComponents() {
+ return Collections.singletonList(valueCoder);
+ }
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index 1fd356b..3ecbaa2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -24,8 +24,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.joda.time.Instant;
@@ -56,7 +56,7 @@ public abstract class ValueInSingleWindow<T> {
}
/** A coder for {@link ValueInSingleWindow}. */
- public static class Coder<T> extends CustomCoder<ValueInSingleWindow<T>> {
+ public static class Coder<T> extends StructuredCoder<ValueInSingleWindow<T>> {
private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
index 0d92f40..3f057e1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -27,7 +27,7 @@ import java.util.Objects;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.DoFn;
/**
@@ -85,7 +85,7 @@ public class ValueWithRecordId<ValueT> {
* A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
*/
public static class ValueWithRecordIdCoder<ValueT>
- extends CustomCoder<ValueWithRecordId<ValueT>> {
+ extends StructuredCoder<ValueWithRecordId<ValueT>> {
public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
return new ValueWithRecordIdCoder<>(valueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index fe21a1c..5107355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -99,7 +99,7 @@ public class CoderRegistryTest {
}
@SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
- private class MyListCoder extends CustomCoder<List> {
+ private class MyListCoder extends AtomicCoder<List> {
@Override
public void encode(List value, OutputStream outStream, Context context)
throws CoderException, IOException {
@@ -441,7 +441,7 @@ public class CoderRegistryTest {
private static class MyValue { }
- private static class MyValueCoder extends CustomCoder<MyValue> {
+ private static class MyValueCoder extends AtomicCoder<MyValue> {
private static final MyValueCoder INSTANCE = new MyValueCoder();
private static final TypeDescriptor<MyValue> TYPE_DESCRIPTOR = TypeDescriptor.of(MyValue.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
index 5ff272f..8aeb22a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
@@ -24,8 +24,6 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
@@ -108,29 +106,6 @@ public class DelegateCoderTest implements Serializable {
private static final String TEST_ENCODING_ID = "test-encoding-id";
private static final String TEST_ALLOWED_ENCODING = "test-allowed-encoding";
- private static class TestAllowedEncodingsCoder extends CustomCoder<Integer> {
-
- @Override
- public void encode(Integer value, OutputStream outstream, Context context) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Integer decode(InputStream instream, Context context) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void verifyDeterministic() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Collections.emptyList();
- }
- }
-
@Test
public void testCoderEquals() throws Exception {
DelegateCoder.CodingFunction<Integer, Integer> identityFn =
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 894d2d1..c0a4bed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -169,7 +169,7 @@ public class NullableCoderTest {
assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(String.class)));
}
- private static class EntireStreamExpectingCoder extends CustomCoder<String> {
+ private static class EntireStreamExpectingCoder extends AtomicCoder<String> {
@Override
public void encode(
String value, OutputStream outStream, Context context) throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
index f337f36..164d221 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
@@ -24,6 +24,7 @@ import com.google.common.base.Strings;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -47,7 +48,7 @@ public class CoderPropertiesTest {
}
/** A coder that says it is not deterministic but actually is. */
- public static class NonDeterministicCoder extends CustomCoder<String> {
+ public static class NonDeterministicCoder extends AtomicCoder<String> {
@Override
public void encode(String value, OutputStream outStream, Context context)
throws CoderException, IOException {
@@ -59,17 +60,23 @@ public class CoderPropertiesTest {
throws CoderException, IOException {
return StringUtf8Coder.of().decode(inStream, context);
}
+
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this, "Not Deterministic");
+ }
}
@Test
public void testNonDeterministicCoder() throws Exception {
try {
CoderProperties.coderDeterministic(new NonDeterministicCoder(), "TestData", "TestData");
- fail("Expected AssertionError");
} catch (AssertionError error) {
assertThat(error.getMessage(),
CoreMatchers.containsString("Expected that the coder is deterministic"));
+ // success!
+ return;
}
+ fail("Expected AssertionError");
}
@Test
@@ -84,7 +91,7 @@ public class CoderPropertiesTest {
}
/** A coder that is non-deterministic because it adds a string to the value. */
- private static class BadDeterminsticCoder extends CustomCoder<String> {
+ private static class BadDeterminsticCoder extends AtomicCoder<String> {
public BadDeterminsticCoder() {
}
@@ -141,6 +148,17 @@ public class CoderPropertiesTest {
String decodedValue = StringUtf8Coder.of().decode(inStream, context);
return decodedValue.substring(0, decodedValue.length() - changedState);
}
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof StateChangingSerializingCoder
+ && ((StateChangingSerializingCoder) other).changedState == this.changedState;
+ }
+
+ @Override
+ public int hashCode() {
+ return changedState;
+ }
}
@Test
@@ -175,6 +193,17 @@ public class CoderPropertiesTest {
throws CoderException, IOException {
return StringUtf8Coder.of().decode(inStream, context);
}
+
+ @Override
+ public boolean equals(Object other) {
+ return (other instanceof ForgetfulSerializingCoder)
+ && ((ForgetfulSerializingCoder) other).lostState == lostState;
+ }
+
+ @Override
+ public int hashCode() {
+ return lostState;
+ }
}
@Test
@@ -185,7 +214,7 @@ public class CoderPropertiesTest {
}
/** A coder which closes the underlying stream during encoding and decoding. */
- public static class ClosingCoder extends CustomCoder<String> {
+ public static class ClosingCoder extends AtomicCoder<String> {
@Override
public void encode(String value, OutputStream outStream, Context context) throws IOException {
outStream.close();
[4/6] beam git commit: Re-add AtomicCoder
Posted by tg...@apache.org.
Re-add AtomicCoder
This is a moderately useful base class for coders which take no
configuration.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/655947b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/655947b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/655947b5
Branch: refs/heads/master
Commit: 655947b597972d9fd6e1d3a777970b0b1152fa05
Parents: 86a9499
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:08:32 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AtomicCoder.java | 86 ++++++++++++++++++++
1 file changed, 86 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/655947b5/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
new file mode 100644
index 0000000..528cfb0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@link Coder} that has no component {@link Coder Coders} or other state.
+ *
+ * <p>Note that, unless the behavior is overridden, atomic coders are presumed to be deterministic.
+ *
+ * <p>All atomic coders of the same class are considered to be equal to each other. As a result,
+ * an {@link AtomicCoder} should have no associated state.
+ *
+ * @param <T> the type of the values being transcoded
+ */
+public abstract class AtomicCoder<T> extends StructuredCoder<T> {
+ /**
+ * Returns an empty list.
+ *
+ * <p>{@link CoderFactories#fromStaticMethods(Class)} builds a {@link CoderFactory} from the
+ * {@code #of()} method and this method, used to determine the components of an object. Because
+ * {@link AtomicCoder} has no components, always returns an empty list.
+ *
+ * @param exampleValue unused, but part of the latent interface expected by {@link
+ * CoderFactories#fromStaticMethods}
+ */
+ public static <T> List<Object> getInstanceComponents(T exampleValue) {
+ return Collections.emptyList();
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * @throws NonDeterministicException
+ */
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {}
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * @return the empty {@link List}.
+ */
+ @Override
+ public final List<? extends Coder<?>> getComponents() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * @return true if the other object has the same class as this {@link AtomicCoder}.
+ */
+ @Override
+ public final boolean equals(Object other) {
+ return other != null && this.getClass().equals(other.getClass());
+ }
+
+ @Override
+ public final int hashCode() {
+ return this.getClass().hashCode();
+ }
+}
[2/6] beam git commit: Reparent many Coders to Atomic or
StructuredCoder
Posted by tg...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index cfe7436..2ef892c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -35,8 +35,8 @@ import java.io.Serializable;
import java.util.Collections;
import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.GenerateSequence;
@@ -88,7 +88,7 @@ public class PAssertTest implements Serializable {
}
}
- private static class NotSerializableObjectCoder extends CustomCoder<NotSerializableObject> {
+ private static class NotSerializableObjectCoder extends AtomicCoder<NotSerializableObject> {
private NotSerializableObjectCoder() { }
private static final NotSerializableObjectCoder INSTANCE = new NotSerializableObjectCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index ddc92d6..db5ff2e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -29,8 +29,8 @@ import com.google.common.collect.ImmutableList;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.hamcrest.Matchers;
@@ -151,7 +151,7 @@ public class SerializableMatchersTest implements Serializable {
}
}
- private static class NotSerializableClassCoder extends CustomCoder<NotSerializableClass> {
+ private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> {
@Override
public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) {
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
index 38a2fa2..546683b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -24,8 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -72,12 +72,12 @@ public class WindowSupplierTest {
Collections.<BoundedWindow>singleton(window));
}
- private static class FailingCoder extends CustomCoder<BoundedWindow> {
+ private static class FailingCoder extends AtomicCoder<BoundedWindow> {
@Override
public void encode(
BoundedWindow value, OutputStream outStream, Context context)
throws CoderException, IOException {
- throw new CoderException("Test Enccode Exception");
+ throw new CoderException("Test Encode Exception");
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 9250dfa..62edac9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -28,10 +28,10 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -326,7 +326,7 @@ public class CombineFnsTest {
}
}
- private static class UserStringCoder extends CustomCoder<UserString> {
+ private static class UserStringCoder extends AtomicCoder<UserString> {
public static UserStringCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 82c2504..12619e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -41,12 +41,12 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
@@ -874,7 +874,7 @@ public class CombineTest implements Serializable {
/**
* A {@link Coder} for {@link CountSum}.
*/
- private class CountSumCoder extends CustomCoder<CountSum> {
+ private class CountSumCoder extends AtomicCoder<CountSum> {
@Override
public void encode(CountSum value, OutputStream outStream,
Context context) throws CoderException, IOException {
@@ -923,7 +923,7 @@ public class CombineTest implements Serializable {
}
public static Coder<Accumulator> getCoder() {
- return new CustomCoder<Accumulator>() {
+ return new AtomicCoder<Accumulator>() {
@Override
public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 89a1f33..76f61b3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -36,10 +36,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -132,7 +132,7 @@ public class CreateTest {
static class Record2 extends Record {
}
- private static class RecordCoder extends CustomCoder<Record> {
+ private static class RecordCoder extends AtomicCoder<Record> {
@Override
public void encode(Record value, OutputStream outStream, Context context)
throws CoderException, IOException {}
@@ -201,7 +201,7 @@ public class CreateTest {
return myString.equals(((UnserializableRecord) o).myString);
}
- static class UnserializableRecordCoder extends CustomCoder<UnserializableRecord> {
+ static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
private final Coder<String> stringCoder = StringUtf8Coder.of();
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 9cb642a..6982e01 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -38,9 +38,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -453,7 +453,7 @@ public class GroupByKeyTest {
/**
* Deterministic {@link Coder} for {@link BadEqualityKey}.
*/
- static class DeterministicKeyCoder extends CustomCoder<BadEqualityKey> {
+ static class DeterministicKeyCoder extends AtomicCoder<BadEqualityKey> {
public static DeterministicKeyCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index ffdf3d0..073957f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,9 +55,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.SetCoder;
@@ -976,7 +976,7 @@ public class ParDoTest implements Serializable {
private static class TestDummy { }
- private static class TestDummyCoder extends CustomCoder<TestDummy> {
+ private static class TestDummyCoder extends AtomicCoder<TestDummy> {
private TestDummyCoder() { }
private static final TestDummyCoder INSTANCE = new TestDummyCoder();
@@ -1085,7 +1085,7 @@ public class ParDoTest implements Serializable {
}
}
- private static class MyIntegerCoder extends CustomCoder<MyInteger> {
+ private static class MyIntegerCoder extends AtomicCoder<MyInteger> {
private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
private final VarIntCoder delegate = VarIntCoder.of();
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index e72c540..84f3d69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -39,9 +39,9 @@ import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -505,7 +505,7 @@ public class ViewTest implements Serializable {
pipeline.run();
}
- private static class NonDeterministicStringCoder extends CustomCoder<String> {
+ private static class NonDeterministicStringCoder extends AtomicCoder<String> {
@Override
public void encode(String value, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 19b6092..f26dd59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -35,8 +35,8 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
@@ -270,7 +270,7 @@ public class DoFnInvokersTest {
private abstract static class SomeRestrictionTracker
implements RestrictionTracker<SomeRestriction> {}
- private static class SomeRestrictionCoder extends CustomCoder<SomeRestriction> {
+ private static class SomeRestrictionCoder extends AtomicCoder<SomeRestriction> {
public static SomeRestrictionCoder of() {
return new SomeRestrictionCoder();
}
@@ -392,7 +392,7 @@ public class DoFnInvokersTest {
public void checkDone() throws IllegalStateException {}
}
- private static class CoderForDefaultTracker extends CustomCoder<RestrictionWithDefaultTracker> {
+ private static class CoderForDefaultTracker extends AtomicCoder<RestrictionWithDefaultTracker> {
public static CoderForDefaultTracker of() {
return new CoderForDefaultTracker();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 0db5355..7230a8b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -24,10 +24,10 @@ import static org.mockito.Mockito.mock;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.testing.CoderPropertiesTest.ClosingCoder;
import org.junit.Rule;
import org.junit.Test;
@@ -44,7 +44,7 @@ public class CoderUtilsTest {
@Rule
public transient ExpectedException expectedException = ExpectedException.none();
- static class TestCoder extends CustomCoder<Integer> {
+ static class TestCoder extends AtomicCoder<Integer> {
public static TestCoder of() {
return new TestCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 2d5baf2..6ba1d4a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -25,9 +25,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -85,7 +85,7 @@ public class SerializableUtilsTest {
}
/** A {@link Coder} that is not serializable by Java. */
- private static class UnserializableCoderByJava extends CustomCoder<Object> {
+ private static class UnserializableCoderByJava extends AtomicCoder<Object> {
private final Object unserializableField = new Object();
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index d4e6f63..0781cf1 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -22,9 +22,9 @@ import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* <p>When this code is used in a nested {@link Coder.Context}, the serialized {@link ByteString}
* objects are first delimited by their size.
*/
-public class ByteStringCoder extends CustomCoder<ByteString> {
+public class ByteStringCoder extends AtomicCoder<ByteString> {
public static ByteStringCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
index 26b4b56..f04e9b9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -25,7 +25,7 @@ import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -34,11 +34,14 @@ import org.apache.beam.sdk.coders.VarIntCoder;
*/
@VisibleForTesting
class ShardedKeyCoder<KeyT>
- extends CustomCoder<ShardedKey<KeyT>> {
+ extends StructuredCoder<ShardedKey<KeyT>> {
public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
return new ShardedKeyCoder<>(keyCoder);
}
+ private final Coder<KeyT> keyCoder;
+ private final VarIntCoder shardNumberCoder;
+
protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
this.keyCoder = keyCoder;
this.shardNumberCoder = VarIntCoder.of();
@@ -68,7 +71,4 @@ class ShardedKeyCoder<KeyT>
public void verifyDeterministic() throws NonDeterministicException {
keyCoder.verifyDeterministic();
}
-
- Coder<KeyT> keyCoder;
- VarIntCoder shardNumberCoder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 8a06d13..3059e2a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -21,12 +21,12 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
/** A coder for {@link TableDestination} objects. */
-public class TableDestinationCoder extends CustomCoder<TableDestination> {
+public class TableDestinationCoder extends AtomicCoder<TableDestination> {
private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index c3e48a4..2b1988a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -22,15 +22,15 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
/**
* Defines a coder for {@link TableRowInfo} objects.
*/
@VisibleForTesting
-class TableRowInfoCoder extends CustomCoder<TableRowInfo> {
+class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
public static TableRowInfoCoder of() {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index a1ca41b..7ca8958 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -23,15 +23,15 @@ import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
*/
-public class TableRowJsonCoder extends CustomCoder<TableRow> {
+public class TableRowJsonCoder extends AtomicCoder<TableRow> {
public static TableRowJsonCoder of() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 70aa135..b896083 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -26,12 +26,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -71,7 +73,7 @@ class WriteBundlesToFiles<DestinationT>
}
/** a coder for the {@link Result} class. */
- public static class ResultCoder<DestinationT> extends CustomCoder<Result<DestinationT>> {
+ public static class ResultCoder<DestinationT> extends StructuredCoder<Result<DestinationT>> {
private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
private static final VarLongCoder longCoder = VarLongCoder.of();
private final Coder<DestinationT> destinationCoder;
@@ -105,6 +107,11 @@ class WriteBundlesToFiles<DestinationT>
}
@Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.singletonList(destinationCoder);
+ }
+
+ @Override
public void verifyDeterministic() {}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 031d9a0..9f04a6c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -31,11 +31,11 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
@@ -100,7 +100,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
/**
* Coder for conveying outgoing messages between internal stages.
*/
- private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
+ private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
private static final NullableCoder<String> RECORD_ID_CODER =
NullableCoder.of(StringUtf8Coder.of());
private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index c2cbe73..c16b8fb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -118,7 +118,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
/**
* Coder for checkpoints.
*/
- private static final PubsubCheckpointCoder CHECKPOINT_CODER = new PubsubCheckpointCoder();
+ private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = PubsubCheckpointCoder.of();
/**
* Maximum number of messages per pull.
@@ -357,11 +357,17 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
}
/** The coder for our checkpoints. */
- private static class PubsubCheckpointCoder extends CustomCoder<PubsubCheckpoint> {
+ private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint> {
private static final Coder<String> SUBSCRIPTION_PATH_CODER =
NullableCoder.of(StringUtf8Coder.of());
private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
+ public static <T> PubsubCheckpointCoder<T> of() {
+ return new PubsubCheckpointCoder<>();
+ }
+
+ private PubsubCheckpointCoder() {}
+
@Override
public void encode(PubsubCheckpoint value, OutputStream outStream, Context context)
throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index a3b21ee..6c15f87 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -73,10 +73,10 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -720,7 +720,7 @@ public class BigQueryIOTest implements Serializable {
/**
* Coder for @link{PartitionedGlobalWindow}.
*/
- private static class PartitionedGlobalWindowCoder extends CustomCoder<PartitionedGlobalWindow> {
+ private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
@Override
public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
throws IOException, CoderException {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 9589fb1..15877b0 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -92,4 +93,21 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> {
"Hadoop Writable may be non-deterministic.");
}
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof WritableCoder)) {
+ return false;
+ }
+ WritableCoder<?> that = (WritableCoder<?>) other;
+ return Objects.equals(this.type, that.type);
+ }
+
+ @Override
+ public int hashCode() {
+ return type.hashCode();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 35a8863..7cc043c 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -21,8 +21,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut
* A {@link Coder} that serializes and deserializes the {@link Mutation} objects using {@link
* ProtobufUtil}.
*/
-class HBaseMutationCoder extends CustomCoder<Mutation> implements Serializable {
+class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder();
private HBaseMutationCoder() {}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 0004d03..24a5f7f 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -21,8 +21,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
* A {@link Coder} that serializes and deserializes the {@link Result} objects using {@link
* ProtobufUtil}.
*/
-class HBaseResultCoder extends CustomCoder<Result> implements Serializable {
+class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
private static final HBaseResultCoder INSTANCE = new HBaseResultCoder();
private HBaseResultCoder() {}
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index f4de76a..e676455 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -55,11 +55,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.io.Read.Unbounded;
@@ -1595,11 +1595,11 @@ public class KafkaIO {
}
}
- private static class NullOnlyCoder<T> extends CustomCoder<T> {
+ private static class NullOnlyCoder<T> extends AtomicCoder<T> {
@Override
public void encode(T value, OutputStream outStream, Context context) {
checkArgument(value == null, "Can only encode nulls");
- // Encode as the empty string.
+ // Encode as no bytes.
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 160e8ce..0f5dc4c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -23,9 +23,9 @@ import java.io.OutputStream;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.values.KV;
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.KV;
/**
* {@link Coder} for {@link KafkaRecord}.
*/
-public class KafkaRecordCoder<K, V> extends CustomCoder<KafkaRecord<K, V>> {
+public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
private static final VarLongCoder longCoder = VarLongCoder.of();
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index 4da2b05..77fe127 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,9 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -32,7 +32,7 @@ import org.joda.time.Instant;
/**
* A {@link Coder} for {@link KinesisRecord}.
*/
-class KinesisRecordCoder extends CustomCoder<KinesisRecord> {
+class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
private static final InstantCoder INSTANT_CODER = InstantCoder.of();
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index 812bc70..75a4619 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -23,6 +23,7 @@ import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Objects;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
@@ -139,6 +140,23 @@ public class JAXBCoder<T> extends CustomCoder<T> {
return TypeDescriptor.of(jaxbClass);
}
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (!(other instanceof JAXBCoder)) {
+ return false;
+ }
+ JAXBCoder<?> that = (JAXBCoder<?>) other;
+ return Objects.equals(this.jaxbClass, that.jaxbClass);
+ }
+
+ @Override
+ public int hashCode() {
+ return jaxbClass.hashCode();
+ }
+
private static class CloseIgnoringInputStream extends FilterInputStream {
protected CloseIgnoringInputStream(InputStream in) {
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
index 940d596..2b4503a 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.CoderProperties;
@@ -171,7 +171,7 @@ public class JAXBCoderTest {
/**
* A coder that surrounds the value with two values, to demonstrate nesting.
*/
- private static class TestCoder extends CustomCoder<TestType> {
+ private static class TestCoder extends StructuredCoder<TestType> {
private final JAXBCoder<TestType> jaxbCoder;
public TestCoder(JAXBCoder<TestType> jaxbCoder) {
this.jaxbCoder = jaxbCoder;
[6/6] beam git commit: Add default implementations of Coder methods
to Coder
Posted by tg...@apache.org.
Add default implementations of Coder methods to Coder
Remove from StructuredCoder. These are sensible defaults implemented in
terms of other Coder methods.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63258c69
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63258c69
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63258c69
Branch: refs/heads/master
Commit: 63258c6986866b5bef58043b056d5c0dfec7303f
Parents: 655947b
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:10:40 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/coders/Coder.java | 121 ++++++++++++++++---
.../apache/beam/sdk/coders/StructuredCoder.java | 67 ++--------
2 files changed, 108 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/63258c69/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 061e9e5..41e83ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -22,6 +22,9 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -206,6 +209,30 @@ public abstract class Coder<T> implements Serializable {
public abstract void verifyDeterministic() throws Coder.NonDeterministicException;
/**
+ * Verifies all of the provided coders are deterministic. If any are not, throws a {@link
+ * NonDeterministicException} for the {@code target} {@link Coder}.
+ */
+ public static void verifyDeterministic(Coder<?> target, String message, Iterable<Coder<?>> coders)
+ throws NonDeterministicException {
+ for (Coder<?> coder : coders) {
+ try {
+ coder.verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ throw new NonDeterministicException(target, message, e);
+ }
+ }
+ }
+
+ /**
+ * Verifies all of the provided coders are deterministic. If any are not, throws a {@link
+ * NonDeterministicException} for the {@code target} {@link Coder}.
+ */
+ public static void verifyDeterministic(Coder<?> target, String message, Coder<?>... coders)
+ throws NonDeterministicException {
+ verifyDeterministic(target, message, Arrays.asList(coders));
+ }
+
+ /**
* Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}.
*
* <p>Whenever the encoded bytes of two values are equal, then the original values are equal
@@ -214,28 +241,50 @@ public abstract class Coder<T> implements Serializable {
* <p>This condition is most notably false for arrays. More generally, this condition is false
* whenever {@code equals()} compares object identity, rather than performing a
* semantic/structural comparison.
+ *
+ * <p>By default, returns false.
*/
- public abstract boolean consistentWithEquals();
+ public boolean consistentWithEquals() {
+ return false;
+ }
/**
- * Returns an object with an {@code Object.equals()} method that represents structural equality
- * on the argument.
+ * Returns an object with an {@code Object.equals()} method that represents structural equality on
+ * the argument.
*
* <p>For any two values {@code x} and {@code y} of type {@code T}, if their encoded bytes are the
* same, then it must be the case that {@code structuralValue(x).equals(@code structuralValue(y)}.
*
* <p>Most notably:
+ *
* <ul>
* <li>The structural value for an array coder should perform a structural comparison of the
- * contents of the arrays, rather than the default behavior of comparing according to object
- * identity.
- * <li>The structural value for a coder accepting {@code null} should be a proper object with
- * an {@code equals()} method, even if the input value is {@code null}.
+ * contents of the arrays, rather than the default behavior of comparing according to object
+ * identity.
+ * <li>The structural value for a coder accepting {@code null} should be a proper object with an
+ * {@code equals()} method, even if the input value is {@code null}.
* </ul>
*
* <p>See also {@link #consistentWithEquals()}.
+ *
+ * <p>By default, if this coder is {@link #consistentWithEquals()}, and the value is not null,
+ * returns the provided object. Otherwise, encodes the value into a {@code byte[]}, and returns
+ * an object that performs array equality on the encoded bytes.
*/
- public abstract Object structuralValue(T value);
+ public Object structuralValue(T value) {
+ if (value != null && consistentWithEquals()) {
+ return value;
+ } else {
+ try {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ encode(value, os, Context.OUTER);
+ return new StructuralByteArray(os.toByteArray());
+ } catch (Exception exn) {
+ throw new IllegalArgumentException(
+ "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+ }
+ }
+ }
/**
* Returns whether {@link #registerByteSizeObserver} cheap enough to
@@ -246,21 +295,44 @@ public abstract class Coder<T> implements Serializable {
* <p>Not intended to be called by user code, but instead by
* {@link PipelineRunner}
* implementations.
+ *
+ * <p>By default, returns false. The default {@link #registerByteSizeObserver} implementation
+ * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
+ * unless it is overridden. This is considered expensive.
*/
- public abstract boolean isRegisterByteSizeObserverCheap(T value);
+ public boolean isRegisterByteSizeObserverCheap(T value) {
+ return isRegisterByteSizeObserverCheap(value, Context.NESTED);
+ }
/**
- * Returns whether {@link #registerByteSizeObserver} cheap enough to
- * call for every element, that is, if this {@code Coder} can
- * calculate the byte size of the element to be coded in roughly
- * constant time (or lazily).
+ * {@inheritDoc}
*
* <p>Not intended to be called by user code, but instead by
* {@link PipelineRunner}
* implementations.
+ *
+ * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
+ * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
+ * unless it is overridden. This is considered expensive.
*/
@Deprecated
- public abstract boolean isRegisterByteSizeObserverCheap(T value, Context context);
+ public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
+ return false;
+ }
+
+ /**
+ * Returns the size in bytes of the encoded value using this coder.
+ */
+ protected long getEncodedElementByteSize(T value, Context context)
+ throws Exception {
+ try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
+ encode(value, os, context);
+ return os.getCount();
+ } catch (Exception exn) {
+ throw new IllegalArgumentException(
+ "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+ }
+ }
/**
* Notifies the {@code ElementByteSizeObserver} about the byte size
@@ -269,10 +341,14 @@ public abstract class Coder<T> implements Serializable {
* <p>Not intended to be called by user code, but instead by
* {@link PipelineRunner}
* implementations.
+ *
+ * <p>By default, this notifies {@code observer} about the byte size
+ * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
*/
- public abstract void registerByteSizeObserver(
- T value, ElementByteSizeObserver observer)
- throws Exception;
+ public void registerByteSizeObserver(T value, ElementByteSizeObserver observer)
+ throws Exception {
+ registerByteSizeObserver(value, observer, Context.NESTED);
+ }
/**
* Notifies the {@code ElementByteSizeObserver} about the byte size
@@ -283,15 +359,20 @@ public abstract class Coder<T> implements Serializable {
* implementations.
*/
@Deprecated
- public abstract void registerByteSizeObserver(
+ public void registerByteSizeObserver(
T value, ElementByteSizeObserver observer, Context context)
- throws Exception;
+ throws Exception {
+ observer.update(getEncodedElementByteSize(value, context));
+ }
/**
* Returns the {@link TypeDescriptor} for the type encoded.
*/
@Experimental(Kind.CODER_TYPE_ENCODING)
- public abstract TypeDescriptor<T> getEncodedTypeDescriptor();
+ public TypeDescriptor<T> getEncodedTypeDescriptor(){
+ return (TypeDescriptor<T>)
+ TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType());
+ }
/**
* Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is
http://git-wip-us.apache.org/repos/asf/beam/blob/63258c69/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
index 0c72618..437f10d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -26,13 +24,15 @@ import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
* An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing
* via the class name and recursively using {@link #getComponents}.
*
+ * <p>A {@link StructuredCoder} should be defined purely in terms of its component coders, and
+ * contain no additional configuration.
+ *
* <p>To extend {@link StructuredCoder}, override the following methods as appropriate:
*
* <ul>
@@ -101,12 +101,14 @@ public abstract class StructuredCoder<T> extends Coder<T> {
return builder.toString();
}
+ @Override
public void encode(T value, OutputStream outStream)
throws CoderException, IOException {
encode(value, outStream, Coder.Context.NESTED);
}
@Deprecated
+ @Override
public void encodeOuter(T value, OutputStream outStream)
throws CoderException, IOException {
encode(value, outStream, Coder.Context.OUTER);
@@ -122,11 +124,13 @@ public abstract class StructuredCoder<T> extends Coder<T> {
}
}
+ @Override
public T decode(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Coder.Context.NESTED);
}
@Deprecated
+ @Override
public T decodeOuter(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Coder.Context.OUTER);
}
@@ -141,63 +145,6 @@ public abstract class StructuredCoder<T> extends Coder<T> {
}
}
- /**
- * {@inheritDoc}
- *
- * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
- * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
- * unless it is overridden. This is considered expensive.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(T value) {
- return isRegisterByteSizeObserverCheap(value, Context.NESTED);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
- * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
- * unless it is overridden. This is considered expensive.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
- return false;
- }
-
- /**
- * Returns the size in bytes of the encoded value using this coder.
- */
- protected long getEncodedElementByteSize(T value, Context context)
- throws Exception {
- try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
- encode(value, os, context);
- return os.getCount();
- } catch (Exception exn) {
- throw new IllegalArgumentException(
- "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
- }
- }
-
- @Override
- public void registerByteSizeObserver(T value, ElementByteSizeObserver observer)
- throws Exception {
- registerByteSizeObserver(value, observer, Context.NESTED);
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>For {@link StructuredCoder} subclasses, this notifies {@code observer} about the byte size
- * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
- */
- @Override
- public void registerByteSizeObserver(
- T value, ElementByteSizeObserver observer, Context context)
- throws Exception {
- observer.update(getEncodedElementByteSize(value, context));
- }
-
protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
throws NonDeterministicException {
for (Coder<?> coder : coders) {
[5/6] beam git commit: Make CustomCoder extend Coder directly
Posted by tg...@apache.org.
Make CustomCoder extend Coder directly
CustomCoder in general consists of configuration beyond component
coders, and should not extend, for example, equality methods of
StrucutredCoder.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c7f3e3c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c7f3e3c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c7f3e3c3
Branch: refs/heads/master
Commit: c7f3e3c3a00423dfcaca6e5b53ad4f84533a56f6
Parents: 987c2cb
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:13:41 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/CustomCoder.java | 50 +++++++++++++++++++-
1 file changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c7f3e3c3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index 87bd531..f33e210 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.coders;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
@@ -33,8 +36,53 @@ import java.util.List;
*
* @param <T> the type of elements handled by this coder
*/
-public abstract class CustomCoder<T> extends StructuredCoder<T>
+public abstract class CustomCoder<T> extends Coder<T>
implements Serializable {
+
+ @Override
+ public void encode(T value, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(value, outStream, Coder.Context.NESTED);
+ }
+
+ @Deprecated
+ @Override
+ public void encodeOuter(T value, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(value, outStream, Coder.Context.OUTER);
+ }
+
+ @Deprecated
+ public void encode(T value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ if (context == Coder.Context.NESTED) {
+ encode(value, outStream);
+ } else {
+ encodeOuter(value, outStream);
+ }
+ }
+
+ @Override
+ public T decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Coder.Context.NESTED);
+ }
+
+ @Deprecated
+ @Override
+ public T decodeOuter(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Coder.Context.OUTER);
+ }
+
+ @Deprecated
+ public T decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ if (context == Coder.Context.NESTED) {
+ return decode(inStream);
+ } else {
+ return decodeOuter(inStream);
+ }
+ }
+
/**
* {@inheritDoc}.
*