You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/05 20:35:21 UTC

[1/6] beam git commit: This closes #2829

Repository: beam
Updated Branches:
  refs/heads/master 86a94998e -> 0490d6b36


This closes #2829


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0490d6b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0490d6b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0490d6b3

Branch: refs/heads/master
Commit: 0490d6b36cfbfe11f3c12873164bae270b4b5232
Parents: 86a9499 c7f3e3c
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 13:30:51 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/ApexStreamTuple.java |   5 +-
 .../UnboundedReadFromBoundedSource.java         |   4 +-
 .../runners/core/construction/CodersTest.java   |  14 +--
 .../core/construction/PCollectionsTest.java     |   3 +-
 .../core/ElementAndRestrictionCoder.java        |  17 ++-
 .../beam/runners/core/KeyedWorkItemCoder.java   |   4 +-
 .../beam/runners/core/TimerInternals.java       |   6 +-
 .../direct/CloningBundleFactoryTest.java        |  10 +-
 .../beam/runners/direct/DirectRunnerTest.java   |   5 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |   4 +-
 .../streaming/SingletonKeyedWorkItemCoder.java  |   4 +-
 .../runners/dataflow/BatchViewOverrides.java    |   8 +-
 .../runners/dataflow/internal/IsmFormat.java    |  40 ++++--
 .../runners/dataflow/util/RandomAccessData.java |   4 +-
 .../org/apache/beam/sdk/coders/AtomicCoder.java |  86 +++++++++++++
 .../org/apache/beam/sdk/coders/AvroCoder.java   |  19 +++
 .../apache/beam/sdk/coders/BigDecimalCoder.java |   2 +-
 .../beam/sdk/coders/BigEndianIntegerCoder.java  |   2 +-
 .../beam/sdk/coders/BigEndianLongCoder.java     |   4 +-
 .../apache/beam/sdk/coders/BigIntegerCoder.java |   4 +-
 .../org/apache/beam/sdk/coders/BitSetCoder.java |   4 +-
 .../apache/beam/sdk/coders/ByteArrayCoder.java  |   6 +-
 .../org/apache/beam/sdk/coders/ByteCoder.java   |   2 +-
 .../java/org/apache/beam/sdk/coders/Coder.java  | 121 ++++++++++++++++---
 .../apache/beam/sdk/coders/CoderFactories.java  |   9 +-
 .../org/apache/beam/sdk/coders/CustomCoder.java |  50 +++++++-
 .../org/apache/beam/sdk/coders/DoubleCoder.java |   2 +-
 .../apache/beam/sdk/coders/DurationCoder.java   |   2 +-
 .../apache/beam/sdk/coders/InstantCoder.java    |   2 +-
 .../org/apache/beam/sdk/coders/KvCoder.java     |   4 +-
 .../org/apache/beam/sdk/coders/ListCoder.java   |   3 +-
 .../org/apache/beam/sdk/coders/MapCoder.java    |   2 +-
 .../apache/beam/sdk/coders/NullableCoder.java   |   6 +-
 .../apache/beam/sdk/coders/StringUtf8Coder.java |   2 +-
 .../apache/beam/sdk/coders/StructuredCoder.java |  67 ++--------
 .../beam/sdk/coders/TextualIntegerCoder.java    |   2 +-
 .../org/apache/beam/sdk/coders/VarIntCoder.java |   2 +-
 .../org/apache/beam/sdk/coders/VoidCoder.java   |   4 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |   4 +-
 .../sdk/transforms/ApproximateQuantiles.java    |  26 +++-
 .../org/apache/beam/sdk/transforms/Combine.java |  12 +-
 .../apache/beam/sdk/transforms/CombineFns.java  |   4 +-
 .../org/apache/beam/sdk/transforms/Count.java   |   4 +-
 .../org/apache/beam/sdk/transforms/Mean.java    |   7 +-
 .../org/apache/beam/sdk/transforms/Top.java     |  23 +++-
 .../beam/sdk/transforms/join/CoGbkResult.java   |   2 +-
 .../beam/sdk/transforms/join/UnionCoder.java    |   7 +-
 .../beam/sdk/transforms/windowing/PaneInfo.java |  10 +-
 .../org/apache/beam/sdk/util/BitSetCoder.java   |   9 +-
 .../org/apache/beam/sdk/util/WindowedValue.java |   7 +-
 .../beam/sdk/values/TimestampedValue.java       |  13 +-
 .../beam/sdk/values/ValueInSingleWindow.java    |   4 +-
 .../beam/sdk/values/ValueWithRecordId.java      |   4 +-
 .../beam/sdk/coders/CoderRegistryTest.java      |   4 +-
 .../beam/sdk/coders/DelegateCoderTest.java      |  25 ----
 .../beam/sdk/coders/NullableCoderTest.java      |   2 +-
 .../beam/sdk/testing/CoderPropertiesTest.java   |  37 +++++-
 .../apache/beam/sdk/testing/PAssertTest.java    |   4 +-
 .../sdk/testing/SerializableMatchersTest.java   |   4 +-
 .../beam/sdk/testing/WindowSupplierTest.java    |   6 +-
 .../beam/sdk/transforms/CombineFnsTest.java     |   4 +-
 .../apache/beam/sdk/transforms/CombineTest.java |   6 +-
 .../apache/beam/sdk/transforms/CreateTest.java  |   6 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |   4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |   6 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |   4 +-
 .../transforms/reflect/DoFnInvokersTest.java    |   6 +-
 .../apache/beam/sdk/util/CoderUtilsTest.java    |   4 +-
 .../beam/sdk/util/SerializableUtilsTest.java    |   4 +-
 .../extensions/protobuf/ByteStringCoder.java    |   4 +-
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    |  10 +-
 .../io/gcp/bigquery/TableDestinationCoder.java  |   4 +-
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  |   4 +-
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  |   4 +-
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  11 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |   4 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |  12 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   4 +-
 .../beam/sdk/io/hadoop/WritableCoder.java       |  18 +++
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |   4 +-
 .../beam/sdk/io/hbase/HBaseResultCoder.java     |   4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |   6 +-
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |   4 +-
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |   4 +-
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   |  18 +++
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   |   4 +-
 86 files changed, 612 insertions(+), 304 deletions(-)
----------------------------------------------------------------------



[3/6] beam git commit: Reparent many Coders to Atomic or StructuredCoder

Posted by tg...@apache.org.
Reparent many Coders to Atomic or StructuredCoder

These coders do not take configuration, or take configuration only in
terms of other Coders, and are appropriate to reparent.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/987c2cbc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/987c2cbc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/987c2cbc

Branch: refs/heads/master
Commit: 987c2cbca433f3e0ff12a637f2b0474e772c6beb
Parents: 63258c6
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:13:05 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/ApexStreamTuple.java |  5 ++-
 .../UnboundedReadFromBoundedSource.java         |  4 +-
 .../runners/core/construction/CodersTest.java   | 14 +------
 .../core/construction/PCollectionsTest.java     |  3 +-
 .../core/ElementAndRestrictionCoder.java        | 17 ++++++++-
 .../beam/runners/core/KeyedWorkItemCoder.java   |  4 +-
 .../beam/runners/core/TimerInternals.java       |  6 +--
 .../direct/CloningBundleFactoryTest.java        | 10 ++---
 .../beam/runners/direct/DirectRunnerTest.java   |  5 +--
 .../UnboundedReadEvaluatorFactoryTest.java      |  4 +-
 .../streaming/SingletonKeyedWorkItemCoder.java  |  4 +-
 .../runners/dataflow/BatchViewOverrides.java    |  8 ++--
 .../runners/dataflow/internal/IsmFormat.java    | 40 ++++++++++++++++----
 .../runners/dataflow/util/RandomAccessData.java |  4 +-
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 19 ++++++++++
 .../apache/beam/sdk/coders/BigDecimalCoder.java |  2 +-
 .../beam/sdk/coders/BigEndianIntegerCoder.java  |  2 +-
 .../beam/sdk/coders/BigEndianLongCoder.java     |  4 +-
 .../apache/beam/sdk/coders/BigIntegerCoder.java |  4 +-
 .../org/apache/beam/sdk/coders/BitSetCoder.java |  4 +-
 .../apache/beam/sdk/coders/ByteArrayCoder.java  |  6 +--
 .../org/apache/beam/sdk/coders/ByteCoder.java   |  2 +-
 .../apache/beam/sdk/coders/CoderFactories.java  |  9 +++--
 .../org/apache/beam/sdk/coders/DoubleCoder.java |  2 +-
 .../apache/beam/sdk/coders/DurationCoder.java   |  2 +-
 .../apache/beam/sdk/coders/InstantCoder.java    |  2 +-
 .../org/apache/beam/sdk/coders/KvCoder.java     |  4 +-
 .../org/apache/beam/sdk/coders/ListCoder.java   |  3 +-
 .../org/apache/beam/sdk/coders/MapCoder.java    |  2 +-
 .../apache/beam/sdk/coders/NullableCoder.java   |  6 +--
 .../apache/beam/sdk/coders/StringUtf8Coder.java |  2 +-
 .../beam/sdk/coders/TextualIntegerCoder.java    |  2 +-
 .../org/apache/beam/sdk/coders/VarIntCoder.java |  2 +-
 .../org/apache/beam/sdk/coders/VoidCoder.java   |  4 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  4 +-
 .../sdk/transforms/ApproximateQuantiles.java    | 26 ++++++++++---
 .../org/apache/beam/sdk/transforms/Combine.java | 12 ++++--
 .../apache/beam/sdk/transforms/CombineFns.java  |  4 +-
 .../org/apache/beam/sdk/transforms/Count.java   |  4 +-
 .../org/apache/beam/sdk/transforms/Mean.java    |  7 ++--
 .../org/apache/beam/sdk/transforms/Top.java     | 23 ++++++++++-
 .../beam/sdk/transforms/join/CoGbkResult.java   |  2 +-
 .../beam/sdk/transforms/join/UnionCoder.java    |  7 ++--
 .../beam/sdk/transforms/windowing/PaneInfo.java | 10 ++++-
 .../org/apache/beam/sdk/util/BitSetCoder.java   |  9 +++--
 .../org/apache/beam/sdk/util/WindowedValue.java |  7 ++--
 .../beam/sdk/values/TimestampedValue.java       | 13 +++++--
 .../beam/sdk/values/ValueInSingleWindow.java    |  4 +-
 .../beam/sdk/values/ValueWithRecordId.java      |  4 +-
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +-
 .../beam/sdk/coders/DelegateCoderTest.java      | 25 ------------
 .../beam/sdk/coders/NullableCoderTest.java      |  2 +-
 .../beam/sdk/testing/CoderPropertiesTest.java   | 37 ++++++++++++++++--
 .../apache/beam/sdk/testing/PAssertTest.java    |  4 +-
 .../sdk/testing/SerializableMatchersTest.java   |  4 +-
 .../beam/sdk/testing/WindowSupplierTest.java    |  6 +--
 .../beam/sdk/transforms/CombineFnsTest.java     |  4 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  6 +--
 .../apache/beam/sdk/transforms/CreateTest.java  |  6 +--
 .../beam/sdk/transforms/GroupByKeyTest.java     |  4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 +--
 .../apache/beam/sdk/transforms/ViewTest.java    |  4 +-
 .../transforms/reflect/DoFnInvokersTest.java    |  6 +--
 .../apache/beam/sdk/util/CoderUtilsTest.java    |  4 +-
 .../beam/sdk/util/SerializableUtilsTest.java    |  4 +-
 .../extensions/protobuf/ByteStringCoder.java    |  4 +-
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    | 10 ++---
 .../io/gcp/bigquery/TableDestinationCoder.java  |  4 +-
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  |  4 +-
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  |  4 +-
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 11 +++++-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  4 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 12 ++++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  4 +-
 .../beam/sdk/io/hadoop/WritableCoder.java       | 18 +++++++++
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |  4 +-
 .../beam/sdk/io/hbase/HBaseResultCoder.java     |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  6 +--
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  4 +-
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  4 +-
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   | 18 +++++++++
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   |  4 +-
 82 files changed, 369 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
index 4ce351b..4aa6ee8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -31,7 +31,7 @@ import java.util.Objects;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 
 /**
  * The common interface for all objects transmitted through streams.
@@ -149,7 +149,7 @@ public interface ApexStreamTuple<T> {
   /**
    * Coder for {@link ApexStreamTuple}.
    */
-  class ApexStreamTupleCoder<T> extends CustomCoder<ApexStreamTuple<T>> {
+  class ApexStreamTupleCoder<T> extends StructuredCoder<ApexStreamTuple<T>> {
     private static final long serialVersionUID = 1L;
     final Coder<T> valueCoder;
 
@@ -194,6 +194,7 @@ public interface ApexStreamTuple<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
+          this,
           this.getClass().getSimpleName() + " requires a deterministic valueCoder",
           valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 0ea13b8..1424b8b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -35,10 +35,10 @@ import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.Read;
@@ -203,7 +203,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @VisibleForTesting
-    static class CheckpointCoder<T> extends CustomCoder<Checkpoint<T>> {
+    static class CheckpointCoder<T> extends StructuredCoder<Checkpoint<T>> {
 
       // The coder for a list of residual elements and their timestamps
       private final Coder<List<TimestampedValue<T>>> elemsCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index 32a78fa..765723c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -30,11 +30,11 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
@@ -149,7 +149,7 @@ public class CodersTest {
 
     static class Record implements Serializable {}
 
-    private static class RecordCoder extends CustomCoder<Record> {
+    private static class RecordCoder extends AtomicCoder<Record> {
       @Override
       public void encode(Record value, OutputStream outStream, Context context)
           throws CoderException, IOException {}
@@ -159,16 +159,6 @@ public class CodersTest {
           throws CoderException, IOException {
         return new Record();
       }
-
-      @Override
-      public boolean equals(Object other) {
-        return other != null && getClass().equals(other.getClass());
-      }
-
-      @Override
-      public int hashCode() {
-        return getClass().hashCode();
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
index c177c58..2c45cbd 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -158,7 +159,7 @@ public class PCollectionsTest {
 
     @Override
     public Coder<BoundedWindow> windowCoder() {
-      return new CustomCoder<BoundedWindow>() {
+      return new AtomicCoder<BoundedWindow>() {
         @Override public void verifyDeterministic() {}
 
         @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 64c1e14..83c4e62 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -17,18 +17,20 @@
  */
 package org.apache.beam.runners.core;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 
 /** A {@link Coder} for {@link ElementAndRestriction}. */
 @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
 public class ElementAndRestrictionCoder<ElementT, RestrictionT>
-    extends CustomCoder<ElementAndRestriction<ElementT, RestrictionT>> {
+    extends StructuredCoder<ElementAndRestriction<ElementT, RestrictionT>> {
   private final Coder<ElementT> elementCoder;
   private final Coder<RestrictionT> restrictionCoder;
 
@@ -65,6 +67,17 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
     return ElementAndRestriction.of(key, value);
   }
 
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(elementCoder, restrictionCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    elementCoder.verifyDeterministic();
+    restrictionCoder.verifyDeterministic();
+  }
+
   public Coder<ElementT> getElementCoder() {
     return elementCoder;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index fddf7fa..e1872b5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -26,8 +26,8 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 /**
  * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
  */
-public class KeyedWorkItemCoder<K, ElemT> extends CustomCoder<KeyedWorkItem<K, ElemT>> {
+public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 21fe430..888c11f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -27,9 +27,9 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -224,7 +224,7 @@ public interface TimerInternals {
   /**
    * A {@link Coder} for {@link TimerData}.
    */
-  class TimerDataCoder extends CustomCoder<TimerData> {
+  class TimerDataCoder extends StructuredCoder<TimerData> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
     private static final InstantCoder INSTANT_CODER = InstantCoder.of();
     private final Coder<? extends BoundedWindow> windowCoder;
@@ -266,7 +266,7 @@ public interface TimerInternals {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic("window coder must be deterministic", windowCoder);
+      verifyDeterministic(this, "window coder must be deterministic", windowCoder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 7d037d1..33d171e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -173,7 +173,7 @@ public class CloningBundleFactoryTest {
   }
 
   static class Record {}
-  static class RecordNoEncodeCoder extends CustomCoder<Record> {
+  static class RecordNoEncodeCoder extends AtomicCoder<Record> {
 
     @Override
     public void encode(
@@ -192,7 +192,7 @@ public class CloningBundleFactoryTest {
     }
   }
 
-  static class RecordNoDecodeCoder extends CustomCoder<Record> {
+  static class RecordNoDecodeCoder extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,
@@ -208,7 +208,7 @@ public class CloningBundleFactoryTest {
     }
   }
 
-  private static class RecordStructuralValueCoder extends CustomCoder<Record> {
+  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,
@@ -240,7 +240,7 @@ public class CloningBundleFactoryTest {
   }
 
   private static class RecordNotConsistentWithEqualsStructuralValueCoder
-      extends CustomCoder<Record> {
+      extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 428c6fc..0fe9585 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -44,9 +44,9 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -523,8 +523,7 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
-  private static class LongNoDecodeCoder extends CustomCoder<Long> {
-
+  private static class LongNoDecodeCoder extends AtomicCoder<Long> {
     @Override
     public void encode(
         Long value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index ceb078b..b9ba7f4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -46,10 +46,10 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
 import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
@@ -586,7 +586,7 @@ public class UnboundedReadEvaluatorFactoryTest {
       return finalized;
     }
 
-    public static class Coder extends CustomCoder<TestCheckpointMark> {
+    public static class Coder extends AtomicCoder<TestCheckpointMark> {
       @Override
       public void encode(
           TestCheckpointMark value,

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index c73700f..f218693 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -26,7 +26,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.util.WindowedValue;
  * Singleton keyed work item coder.
  */
 public class SingletonKeyedWorkItemCoder<K, ElemT>
-    extends CustomCoder<SingletonKeyedWorkItem<K, ElemT>> {
+    extends StructuredCoder<SingletonKeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ef2bfed..ecd0365 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -50,11 +50,11 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -1335,7 +1335,7 @@ class BatchViewOverrides {
    * A {@link Coder} for {@link TransformedMap}s.
    */
   static class TransformedMapCoder<K, V1, V2>
-      extends CustomCoder<TransformedMap<K, V1, V2>> {
+      extends StructuredCoder<TransformedMap<K, V1, V2>> {
     private final Coder<Function<V1, V2>> transformCoder;
     private final Coder<Map<K, V1>> originalMapCoder;
 
@@ -1373,8 +1373,8 @@ class BatchViewOverrides {
     @Override
     public void verifyDeterministic()
         throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-      verifyDeterministic("Expected transform coder to be deterministic.", transformCoder);
-      verifyDeterministic("Expected map coder to be deterministic.", originalMapCoder);
+      verifyDeterministic(this, "Expected transform coder to be deterministic.", transformCoder);
+      verifyDeterministic(this, "Expected map coder to be deterministic.", originalMapCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index fbfe49a..aed514a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -32,14 +32,17 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.util.VarInt;
@@ -356,8 +359,9 @@ public class IsmFormat {
 
     @Override
     public void verifyDeterministic() throws Coder.NonDeterministicException {
-      verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
-      verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
+      verifyDeterministic(
+          this, "Key component coders expected to be deterministic.", keyComponentCoders);
+      verifyDeterministic(this, "Value coder expected to be deterministic.", valueCoder);
     }
 
     @Override
@@ -393,6 +397,28 @@ public class IsmFormat {
       }
       return super.structuralValue(record);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof IsmRecordCoder)) {
+        return false;
+      }
+      IsmRecordCoder<?> that = (IsmRecordCoder<?>) other;
+      return Objects.equals(this.numberOfShardKeyCoders, that.numberOfShardKeyCoders)
+          && Objects.equals(
+              this.numberOfMetadataShardKeyCoders, that.numberOfMetadataShardKeyCoders)
+          && Objects.equals(this.keyComponentCoders, that.keyComponentCoders)
+          && Objects.equals(this.valueCoder, that.valueCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          numberOfShardKeyCoders, numberOfMetadataShardKeyCoders, keyComponentCoders, valueCoder);
+    }
   }
 
   /**
@@ -450,7 +476,7 @@ public class IsmFormat {
    * A coder for metadata key component. Can be used to wrap key component coder allowing for
    * the metadata key component to be used as a place holder instead of an actual key.
    */
-  public static class MetadataKeyCoder<K> extends CustomCoder<K> {
+  public static class MetadataKeyCoder<K> extends StructuredCoder<K> {
     public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
       checkNotNull(keyCoder);
       return new MetadataKeyCoder<>(keyCoder);
@@ -497,7 +523,7 @@ public class IsmFormat {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic("Expected key coder to be deterministic", keyCoder);
+      verifyDeterministic(this, "Expected key coder to be deterministic", keyCoder);
     }
   }
 
@@ -584,7 +610,7 @@ public class IsmFormat {
    *   <li>indexOffset (variable length long encoding)</li>
    * </ul>
    */
-  public static class IsmShardCoder extends CustomCoder<IsmShard> {
+  public static class IsmShardCoder extends AtomicCoder<IsmShard> {
     private static final IsmShardCoder INSTANCE = new IsmShardCoder();
 
     /** Returns an IsmShardCoder. */
@@ -649,7 +675,7 @@ public class IsmFormat {
   }
 
   /** A {@link Coder} for {@link KeyPrefix}. */
-  public static final class KeyPrefixCoder extends CustomCoder<KeyPrefix> {
+  public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
     private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
 
     public static KeyPrefixCoder of() {
@@ -721,7 +747,7 @@ public class IsmFormat {
   }
 
   /** A {@link Coder} for {@link Footer}. */
-  public static final class FooterCoder extends CustomCoder<Footer> {
+  public static final class FooterCoder extends AtomicCoder<Footer> {
     private static final FooterCoder INSTANCE = new FooterCoder();
 
     public static FooterCoder of() {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index 66548e2..4e94515 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -30,10 +30,10 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Comparator;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.VarInt;
 
 /**
@@ -55,7 +55,7 @@ public class RandomAccessData {
    *
    * <p>This coder does not support encoding positive infinity.
    */
-  public static class RandomAccessDataCoder extends CustomCoder<RandomAccessData> {
+  public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
     private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
 
     public static RandomAccessDataCoder of() {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 1e01f1a..2aa2b44 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -681,4 +682,22 @@ public class AvroCoder<T> extends CustomCoder<T> {
       throw new IllegalArgumentException("Unable to get field " + name + " from " + originalClazz);
     }
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (!(other instanceof AvroCoder)) {
+      return false;
+    }
+    AvroCoder<?> that = (AvroCoder<?>) other;
+    return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
+        && Objects.equals(this.typeDescriptor, that.typeDescriptor);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(schemaSupplier.get(), typeDescriptor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index d628203..aadf085 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -32,7 +32,7 @@ import java.math.MathContext;
  * {@link BigInteger}, when scaled (with unlimited precision, aka {@link MathContext#UNLIMITED}),
  * yields the expected {@link BigDecimal}.
  */
-public class BigDecimalCoder extends CustomCoder<BigDecimal> {
+public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
 
   public static BigDecimalCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index 81c5e94..c3c7a96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link BigEndianIntegerCoder} encodes {@link Integer Integers} in 4 bytes, big-endian.
  */
-public class BigEndianIntegerCoder extends CustomCoder<Integer> {
+public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
 
   public static BigEndianIntegerCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 173e910..5ef4878 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -30,9 +29,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link BigEndianLongCoder} encodes {@link Long}s in 8 bytes, big-endian.
  */
-public class BigEndianLongCoder extends CustomCoder<Long> {
+public class BigEndianLongCoder extends AtomicCoder<Long> {
 
-  @JsonCreator
   public static BigEndianLongCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index a739da7..6d14d17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -28,7 +28,7 @@ import java.math.BigInteger;
  * A {@link BigIntegerCoder} encodes a {@link BigInteger} as a byte array containing the big endian
  * two's-complement representation, encoded via {@link ByteArrayCoder}.
  */
-public class BigIntegerCoder extends CustomCoder<BigInteger> {
+public class BigIntegerCoder extends AtomicCoder<BigInteger> {
 
   public static BigIntegerCoder of() {
     return INSTANCE;
@@ -55,7 +55,7 @@ public class BigIntegerCoder extends CustomCoder<BigInteger> {
   }
 
   @Override
-  public void verifyDeterministic() throws NonDeterministicException {
+  public void verifyDeterministic() {
     BYTE_ARRAY_CODER.verifyDeterministic();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
index 5a4db24..f49776b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -25,7 +25,7 @@ import java.util.BitSet;
 /**
  * Coder for {@link BitSet}.
  */
-public class BitSetCoder extends CustomCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
   private static final BitSetCoder INSTANCE = new BitSetCoder();
   private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
 
@@ -53,7 +53,7 @@ public class BitSetCoder extends CustomCoder<BitSet> {
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
     verifyDeterministic(
-        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+        this, "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index cba8d49..28cb627 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.io.ByteStreams;
 import java.io.IOException;
 import java.io.InputStream;
@@ -40,9 +39,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * encoded via a {@link VarIntCoder}.</li>
  * </ul>
  */
-public class ByteArrayCoder extends StructuredCoder<byte[]> {
+public class ByteArrayCoder extends AtomicCoder<byte[]> {
 
-  @JsonCreator
   public static ByteArrayCoder of() {
     return INSTANCE;
   }
@@ -117,7 +115,7 @@ public class ByteArrayCoder extends StructuredCoder<byte[]> {
   }
 
   @Override
-  public void verifyDeterministic() throws NonDeterministicException {}
+  public void verifyDeterministic() {}
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 1a1be64..6e4318e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization.
  */
-public class ByteCoder extends CustomCoder<Byte> {
+public class ByteCoder extends AtomicCoder<Byte> {
 
   public static ByteCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
index 2a1d792..4f05c95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
@@ -239,11 +239,12 @@ public final class CoderFactories {
     }
 
     /**
-     * If {@code coderType} is a subclass of {@link Coder} for a specific
-     * type {@code T}, returns {@code T.class}. Otherwise, raises IllegalArgumentException.
+     * If {@code coderType} is a subclass of {@link Coder} for a specific type {@code T}, returns
+     * {@code T.class}. Otherwise, raises IllegalArgumentException.
      */
-    private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<? extends Coder> coderType) {
-      TypeDescriptor<?> coderSupertype = coderType.getSupertype(Coder.class);
+    private <T, CoderT extends Coder> TypeDescriptor<T> getCodedType(
+        TypeDescriptor<CoderT> coderType) {
+      TypeDescriptor<? super CoderT> coderSupertype = coderType.getSupertype(Coder.class);
       ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType();
       @SuppressWarnings("unchecked")
       TypeDescriptor<T> token =

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 06e7dae..12bc5e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
  */
-public class DoubleCoder extends CustomCoder<Double> {
+public class DoubleCoder extends AtomicCoder<Double> {
 
   public static DoubleCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 10a83ef..7b49d1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -29,7 +29,7 @@ import org.joda.time.ReadableDuration;
  * A {@link Coder} that encodes a joda {@link Duration} as a {@link Long} using the format of
  * {@link VarLongCoder}.
  */
-public class DurationCoder extends CustomCoder<ReadableDuration> {
+public class DurationCoder extends AtomicCoder<ReadableDuration> {
 
   public static DurationCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index cfd1979..56ed12b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
  * A {@link Coder} for joda {@link Instant} that encodes it as a big endian {@link Long}
  * shifted such that lexicographic ordering of the bytes corresponds to chronological order.
  */
-public class InstantCoder extends CustomCoder<Instant> {
+public class InstantCoder extends AtomicCoder<Instant> {
   public static InstantCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 8a689f7..35b7449 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -89,8 +89,8 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
 
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Key coder must be deterministic", getKeyCoder());
-    verifyDeterministic("Value coder must be deterministic", getValueCoder());
+    verifyDeterministic(this, "Key coder must be deterministic", getKeyCoder());
+    verifyDeterministic(this, "Value coder must be deterministic", getValueCoder());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 32467d2..70bbf93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -58,8 +58,7 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
    */
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "ListCoder.elemCoder must be deterministic", getElemCoder());
+    verifyDeterministic(this, "ListCoder.elemCoder must be deterministic", getElemCoder());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index e2c4e28..da2bf50 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.values.TypeParameter;
  * @param <K> the type of the keys of the KVs being transcoded
  * @param <V> the type of the values of the KVs being transcoded
  */
-public class MapCoder<K, V> extends CustomCoder<Map<K, V>> {
+public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
   /**
    * Produces a MapCoder with the given keyCoder and valueCoder.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 747d91c..d1eea9a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> the type of the values being transcoded
  */
-public class NullableCoder<T> extends CustomCoder<T> {
+public class NullableCoder<T> extends StructuredCoder<T> {
   public static <T> NullableCoder<T> of(Coder<T> valueCoder) {
     if (valueCoder instanceof NullableCoder) {
       return (NullableCoder<T>) valueCoder;
@@ -93,11 +93,11 @@ public class NullableCoder<T> extends CustomCoder<T> {
   /**
    * {@code NullableCoder} is deterministic if the nested {@code Coder} is.
    *
-   * {@inheritDoc}
+   * <p>{@inheritDoc}
    */
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Value coder must be deterministic", valueCoder);
+    verifyDeterministic(this, "Value coder must be deterministic", valueCoder);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index f0a0969..42931ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * If in a nested context, prefixes the string with an integer length field,
  * encoded via a {@link VarIntCoder}.
  */
-public class StringUtf8Coder extends CustomCoder<String> {
+public class StringUtf8Coder extends AtomicCoder<String> {
 
   public static StringUtf8Coder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 91a46ea..9743c4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
  * their textual, decimal, representation.
  */
-public class TextualIntegerCoder extends CustomCoder<Integer> {
+public class TextualIntegerCoder extends AtomicCoder<Integer> {
 
   public static TextualIntegerCoder of() {
     return new TextualIntegerCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index fcc0335..30f9c09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
  * integers that are known to often be large or negative.
  */
-public class VarIntCoder extends CustomCoder<Integer> {
+public class VarIntCoder extends AtomicCoder<Integer> {
 
   public static VarIntCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index a65fa5e..829bd20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -25,9 +24,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
  */
-public class VoidCoder extends CustomCoder<Void> {
+public class VoidCoder extends AtomicCoder<Void> {
 
-  @JsonCreator
   public static VoidCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 0daf5dc..20fab9b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -43,8 +43,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
@@ -938,7 +938,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   /**
    * A coder for {@link FileResult} objects.
    */
-  public static final class FileResultCoder extends CustomCoder<FileResult> {
+  public static final class FileResultCoder extends AtomicCoder<FileResult> {
     private static final FileResultCoder INSTANCE = new FileResultCoder();
     private final NullableCoder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 5432f09..b05f223 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -762,13 +763,28 @@ public class ApproximateQuantiles {
     }
 
     @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof QuantileStateCoder)) {
+        return false;
+      }
+      QuantileStateCoder<?, ?> that = (QuantileStateCoder<?, ?>) other;
+      return Objects.equals(this.elementCoder, that.elementCoder)
+          && Objects.equals(this.compareFn, that.compareFn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(elementCoder, compareFn);
+    }
+
+    @Override
     public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(this, "QuantileState.ElementCoder must be deterministic", elementCoder);
       verifyDeterministic(
-          "QuantileState.ElementCoder must be deterministic",
-          elementCoder);
-      verifyDeterministic(
-          "QuantileState.ElementListCoder must be deterministic",
-          elementListCoder);
+          this, "QuantileState.ElementListCoder must be deterministic", elementListCoder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 666db3b..b9cdbd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,10 +36,10 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DelegateCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -522,7 +523,7 @@ public class Combine {
   /**
    * A {@link Coder} for a {@link Holder}.
    */
-  private static class HolderCoder<V> extends CustomCoder<Holder<V>> {
+  private static class HolderCoder<V> extends StructuredCoder<Holder<V>> {
 
     private Coder<V> valueCoder;
 
@@ -552,6 +553,11 @@ public class Combine {
     }
 
     @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.singletonList(valueCoder);
+    }
+
+    @Override
     public void verifyDeterministic() throws NonDeterministicException {
       valueCoder.verifyDeterministic();
     }
@@ -1954,7 +1960,7 @@ public class Combine {
       }
 
       private static class InputOrAccumCoder<InputT, AccumT>
-          extends CustomCoder<InputOrAccum<InputT, AccumT>> {
+          extends StructuredCoder<InputOrAccum<InputT, AccumT>> {
 
         private final Coder<InputT> inputCoder;
         private final Coder<AccumT> accumCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index cc02dcf..d4c97bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
@@ -524,7 +524,7 @@ public class CombineFns {
     }
   }
 
-  private static class ComposedAccumulatorCoder extends CustomCoder<Object[]> {
+  private static class ComposedAccumulatorCoder extends StructuredCoder<Object[]> {
     private List<Coder<Object>> coders;
     private int codersCount;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 78a6cd1..753e14c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -23,10 +23,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import java.util.Iterator;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.util.VarInt;
@@ -167,7 +167,7 @@ public class Count {
     @Override
     public Coder<long[]> getAccumulatorCoder(CoderRegistry registry,
                                              Coder<T> inputCoder) {
-      return new CustomCoder<long[]>() {
+      return new AtomicCoder<long[]>() {
         @Override
         public void encode(long[] value, OutputStream outStream, Context context)
             throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a6808cf..a309954 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
 
@@ -180,9 +180,8 @@ public class Mean {
     }
   }
 
-  static class CountSumCoder<NumT extends Number>
-  extends CustomCoder<CountSum<NumT>> {
-     private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
+  static class CountSumCoder<NumT extends Number> extends AtomicCoder<CountSum<NumT>> {
+    private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
      private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
 
      @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index e42c0b2..9d5db74 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -551,8 +552,7 @@ public class Top {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic(
-          "HeapCoder requires a deterministic list coder", listCoder);
+      verifyDeterministic(this, "HeapCoder requires a deterministic list coder", listCoder);
     }
 
     @Override
@@ -568,5 +568,24 @@ public class Top {
             throws Exception {
       listCoder.registerByteSizeObserver(value.asList(), observer, context);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof BoundedHeapCoder)) {
+        return false;
+      }
+      BoundedHeapCoder<?, ?> that = (BoundedHeapCoder<?, ?>) other;
+      return Objects.equals(this.compareFn, that.compareFn)
+          && Objects.equals(this.listCoder, that.listCoder)
+          && this.maximumSize == that.maximumSize;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(compareFn, listCoder, maximumSize);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 02e1185..e9a3571 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -300,7 +300,7 @@ public class CoGbkResult {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "CoGbkResult requires the union coder to be deterministic", unionCoder);
+          this, "CoGbkResult requires the union coder to be deterministic", unionCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index f411cd1..4a2a286 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -23,14 +23,14 @@ import java.io.OutputStream;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 
 /**
  * A UnionCoder encodes RawUnionValues.
  */
-public class UnionCoder extends CustomCoder<RawUnionValue> {
+public class UnionCoder extends StructuredCoder<RawUnionValue> {
   // TODO: Think about how to integrate this with a schema object (i.e.
   // a tuple of tuple tags).
   /**
@@ -134,7 +134,6 @@ public class UnionCoder extends CustomCoder<RawUnionValue> {
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
     verifyDeterministic(
-        "UnionCoder is only deterministic if all element coders are",
-        elementCoders);
+        this, "UnionCoder is only deterministic if all element coders are", elementCoders);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index faf3ca9..79ce2f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -26,9 +26,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -307,7 +307,7 @@ public final class PaneInfo {
   /**
    * A Coder for encoding PaneInfo instances.
    */
-  public static class PaneInfoCoder extends CustomCoder<PaneInfo> {
+  public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
     private enum Encoding {
       FIRST,
       ONE_INDEX,
@@ -340,6 +340,12 @@ public final class PaneInfo {
 
     public static final PaneInfoCoder INSTANCE = new PaneInfoCoder();
 
+    public static PaneInfoCoder of() {
+      return INSTANCE;
+    }
+
+    private PaneInfoCoder() {}
+
     @Override
     public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index b0e9b5c..b646bf6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -21,15 +21,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.BitSet;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 
 /**
  * Coder for the BitSet used to track child-trigger finished states.
+ *
+ * @deprecated use {@link org.apache.beam.sdk.coders.BitSetCoder} instead
  */
 @Deprecated
-public class BitSetCoder extends CustomCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
 
   private static final BitSetCoder INSTANCE = new BitSetCoder();
   private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
@@ -54,8 +56,7 @@ public class BitSetCoder extends CustomCoder<BitSet> {
 
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+    BYTE_ARRAY_CODER.verifyDeterministic();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 13e499d..1e72550 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -663,11 +663,9 @@ public abstract class WindowedValue<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "FullWindowedValueCoder requires a deterministic valueCoder",
-          valueCoder);
+          this, "FullWindowedValueCoder requires a deterministic valueCoder", valueCoder);
       verifyDeterministic(
-          "FullWindowedValueCoder requires a deterministic windowCoder",
-          windowCoder);
+          this, "FullWindowedValueCoder requires a deterministic windowCoder", windowCoder);
     }
 
     @Override
@@ -728,6 +726,7 @@ public abstract class WindowedValue<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
+          this,
           "ValueOnlyWindowedValueCoder requires a deterministic valueCoder",
           valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index cde9a40..a9f3929 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -23,11 +23,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Instant;
 
@@ -85,7 +86,7 @@ public class TimestampedValue<V> {
   /////////////////////////////////////////////////////////////////////////////
 
   /** A {@link Coder} for {@link TimestampedValue}. */
-  public static class TimestampedValueCoder<T> extends CustomCoder<TimestampedValue<T>> {
+  public static class TimestampedValueCoder<T> extends StructuredCoder<TimestampedValue<T>> {
 
     private final Coder<T> valueCoder;
 
@@ -119,8 +120,7 @@ public class TimestampedValue<V> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "TimestampedValueCoder requires a deterministic valueCoder",
-          valueCoder);
+          this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder);
     }
 
     @Override
@@ -141,6 +141,11 @@ public class TimestampedValue<V> {
       return new TypeDescriptor<TimestampedValue<T>>() {}.where(
           new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
     }
+
+    @Override
+    public List<? extends Coder<?>> getComponents() {
+      return Collections.singletonList(valueCoder);
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index 1fd356b..3ecbaa2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -24,8 +24,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.joda.time.Instant;
@@ -56,7 +56,7 @@ public abstract class ValueInSingleWindow<T> {
   }
 
   /** A coder for {@link ValueInSingleWindow}. */
-  public static class Coder<T> extends CustomCoder<ValueInSingleWindow<T>> {
+  public static class Coder<T> extends StructuredCoder<ValueInSingleWindow<T>> {
     private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
     private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
index 0d92f40..3f057e1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -27,7 +27,7 @@ import java.util.Objects;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
@@ -85,7 +85,7 @@ public class ValueWithRecordId<ValueT> {
    * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
    */
   public static class ValueWithRecordIdCoder<ValueT>
-      extends CustomCoder<ValueWithRecordId<ValueT>> {
+      extends StructuredCoder<ValueWithRecordId<ValueT>> {
     public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
       return new ValueWithRecordIdCoder<>(valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index fe21a1c..5107355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -99,7 +99,7 @@ public class CoderRegistryTest {
   }
 
   @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
-  private class MyListCoder extends CustomCoder<List> {
+  private class MyListCoder extends AtomicCoder<List> {
     @Override
     public void encode(List value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -441,7 +441,7 @@ public class CoderRegistryTest {
 
   private static class MyValue { }
 
-  private static class MyValueCoder extends CustomCoder<MyValue> {
+  private static class MyValueCoder extends AtomicCoder<MyValue> {
 
     private static final MyValueCoder INSTANCE = new MyValueCoder();
     private static final TypeDescriptor<MyValue> TYPE_DESCRIPTOR = TypeDescriptor.of(MyValue.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
index 5ff272f..8aeb22a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
@@ -24,8 +24,6 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
@@ -108,29 +106,6 @@ public class DelegateCoderTest implements Serializable {
   private static final String TEST_ENCODING_ID = "test-encoding-id";
   private static final String TEST_ALLOWED_ENCODING = "test-allowed-encoding";
 
-  private static class TestAllowedEncodingsCoder extends CustomCoder<Integer> {
-
-    @Override
-    public void encode(Integer value, OutputStream outstream, Context context) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Integer decode(InputStream instream, Context context) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void verifyDeterministic() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Collections.emptyList();
-    }
-  }
-
   @Test
   public void testCoderEquals() throws Exception {
     DelegateCoder.CodingFunction<Integer, Integer> identityFn =

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 894d2d1..c0a4bed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -169,7 +169,7 @@ public class NullableCoderTest {
     assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(String.class)));
   }
 
-  private static class EntireStreamExpectingCoder extends CustomCoder<String> {
+  private static class EntireStreamExpectingCoder extends AtomicCoder<String> {
     @Override
     public void encode(
         String value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
index f337f36..164d221 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
@@ -24,6 +24,7 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -47,7 +48,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder that says it is not deterministic but actually is. */
-  public static class NonDeterministicCoder extends CustomCoder<String> {
+  public static class NonDeterministicCoder extends AtomicCoder<String> {
     @Override
     public void encode(String value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -59,17 +60,23 @@ public class CoderPropertiesTest {
         throws CoderException, IOException {
       return StringUtf8Coder.of().decode(inStream, context);
     }
+
+    public void verifyDeterministic() throws NonDeterministicException {
+      throw new NonDeterministicException(this, "Not Deterministic");
+    }
   }
 
   @Test
   public void testNonDeterministicCoder() throws Exception {
     try {
       CoderProperties.coderDeterministic(new NonDeterministicCoder(), "TestData", "TestData");
-      fail("Expected AssertionError");
     } catch (AssertionError error) {
       assertThat(error.getMessage(),
           CoreMatchers.containsString("Expected that the coder is deterministic"));
+      // success!
+      return;
     }
+    fail("Expected AssertionError");
   }
 
   @Test
@@ -84,7 +91,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder that is non-deterministic because it adds a string to the value. */
-  private static class BadDeterminsticCoder extends CustomCoder<String> {
+  private static class BadDeterminsticCoder extends AtomicCoder<String> {
     public BadDeterminsticCoder() {
     }
 
@@ -141,6 +148,17 @@ public class CoderPropertiesTest {
       String decodedValue = StringUtf8Coder.of().decode(inStream, context);
       return decodedValue.substring(0, decodedValue.length() - changedState);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof StateChangingSerializingCoder
+          && ((StateChangingSerializingCoder) other).changedState == this.changedState;
+    }
+
+    @Override
+    public int hashCode() {
+      return changedState;
+    }
   }
 
   @Test
@@ -175,6 +193,17 @@ public class CoderPropertiesTest {
         throws CoderException, IOException {
       return StringUtf8Coder.of().decode(inStream, context);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return (other instanceof ForgetfulSerializingCoder)
+          && ((ForgetfulSerializingCoder) other).lostState == lostState;
+    }
+
+    @Override
+    public int hashCode() {
+      return lostState;
+    }
   }
 
   @Test
@@ -185,7 +214,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder which closes the underlying stream during encoding and decoding. */
-  public static class ClosingCoder extends CustomCoder<String> {
+  public static class ClosingCoder extends AtomicCoder<String> {
     @Override
     public void encode(String value, OutputStream outStream, Context context) throws IOException {
       outStream.close();


[4/6] beam git commit: Re-add AtomicCoder

Posted by tg...@apache.org.
Re-add AtomicCoder

This is a moderately useful base class for coders which take no
configuration.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/655947b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/655947b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/655947b5

Branch: refs/heads/master
Commit: 655947b597972d9fd6e1d3a777970b0b1152fa05
Parents: 86a9499
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:08:32 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/AtomicCoder.java | 86 ++++++++++++++++++++
 1 file changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/655947b5/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
new file mode 100644
index 0000000..528cfb0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@link Coder} that has no component {@link Coder Coders} or other state.
+ *
+ * <p>Note that, unless the behavior is overridden, atomic coders are presumed to be deterministic.
+ *
+ * <p>All atomic coders of the same class are considered to be equal to each other. As a result,
+ * an {@link AtomicCoder} should have no associated state.
+ *
+ * @param <T> the type of the values being transcoded
+ */
+public abstract class AtomicCoder<T> extends StructuredCoder<T> {
+  /**
+   * Returns an empty list.
+   *
+   * <p>{@link CoderFactories#fromStaticMethods(Class)} builds a {@link CoderFactory} from the
+   * {@code #of()} method and this method, used to determine the components of an object. Because
+   * {@link AtomicCoder} has no components, always returns an empty list.
+   *
+   * @param exampleValue unused, but part of the latent interface expected by {@link
+   *     CoderFactories#fromStaticMethods}
+   */
+  public static <T> List<Object> getInstanceComponents(T exampleValue) {
+    return Collections.emptyList();
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * @throws NonDeterministicException
+   */
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {}
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * @return the empty {@link List}.
+   */
+  @Override
+  public final List<? extends Coder<?>> getComponents() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * @return true if the other object has the same class as this {@link AtomicCoder}.
+   */
+  @Override
+  public final boolean equals(Object other) {
+    return other != null && this.getClass().equals(other.getClass());
+  }
+
+  @Override
+  public final int hashCode() {
+    return this.getClass().hashCode();
+  }
+}


[2/6] beam git commit: Reparent many Coders to Atomic or StructuredCoder

Posted by tg...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index cfe7436..2ef892c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -35,8 +35,8 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
@@ -88,7 +88,7 @@ public class PAssertTest implements Serializable {
     }
   }
 
-  private static class NotSerializableObjectCoder extends CustomCoder<NotSerializableObject> {
+  private static class NotSerializableObjectCoder extends AtomicCoder<NotSerializableObject> {
     private NotSerializableObjectCoder() { }
     private static final NotSerializableObjectCoder INSTANCE = new NotSerializableObjectCoder();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index ddc92d6..db5ff2e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -29,8 +29,8 @@ import com.google.common.collect.ImmutableList;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.hamcrest.Matchers;
@@ -151,7 +151,7 @@ public class SerializableMatchersTest implements Serializable {
     }
   }
 
-  private static class NotSerializableClassCoder extends CustomCoder<NotSerializableClass> {
+  private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> {
     @Override
     public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) {
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
index 38a2fa2..546683b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -24,8 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collections;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -72,12 +72,12 @@ public class WindowSupplierTest {
         Collections.<BoundedWindow>singleton(window));
   }
 
-  private static class FailingCoder extends CustomCoder<BoundedWindow>  {
+  private static class FailingCoder extends AtomicCoder<BoundedWindow> {
     @Override
     public void encode(
         BoundedWindow value, OutputStream outStream, Context context)
         throws CoderException, IOException {
-      throw new CoderException("Test Enccode Exception");
+      throw new CoderException("Test Encode Exception");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 9250dfa..62edac9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -28,10 +28,10 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -326,7 +326,7 @@ public class  CombineFnsTest {
     }
   }
 
-  private static class UserStringCoder extends CustomCoder<UserString> {
+  private static class UserStringCoder extends AtomicCoder<UserString> {
     public static UserStringCoder of() {
       return INSTANCE;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 82c2504..12619e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -41,12 +41,12 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -874,7 +874,7 @@ public class CombineTest implements Serializable {
     /**
      * A {@link Coder} for {@link CountSum}.
      */
-    private class CountSumCoder extends CustomCoder<CountSum> {
+    private class CountSumCoder extends AtomicCoder<CountSum> {
       @Override
       public void encode(CountSum value, OutputStream outStream,
           Context context) throws CoderException, IOException {
@@ -923,7 +923,7 @@ public class CombineTest implements Serializable {
       }
 
       public static Coder<Accumulator> getCoder() {
-        return new CustomCoder<Accumulator>() {
+        return new AtomicCoder<Accumulator>() {
           @Override
           public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
               throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 89a1f33..76f61b3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -36,10 +36,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -132,7 +132,7 @@ public class CreateTest {
   static class Record2 extends Record {
   }
 
-  private static class RecordCoder extends CustomCoder<Record> {
+  private static class RecordCoder extends AtomicCoder<Record> {
     @Override
     public void encode(Record value, OutputStream outStream, Context context)
         throws CoderException, IOException {}
@@ -201,7 +201,7 @@ public class CreateTest {
       return myString.equals(((UnserializableRecord) o).myString);
     }
 
-    static class UnserializableRecordCoder extends CustomCoder<UnserializableRecord> {
+    static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
       private final Coder<String> stringCoder = StringUtf8Coder.of();
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 9cb642a..6982e01 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -38,9 +38,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -453,7 +453,7 @@ public class GroupByKeyTest {
   /**
    * Deterministic {@link Coder} for {@link BadEqualityKey}.
    */
-  static class DeterministicKeyCoder extends CustomCoder<BadEqualityKey> {
+  static class DeterministicKeyCoder extends AtomicCoder<BadEqualityKey> {
 
     public static DeterministicKeyCoder of() {
       return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index ffdf3d0..073957f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,9 +55,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.SetCoder;
@@ -976,7 +976,7 @@ public class ParDoTest implements Serializable {
 
   private static class TestDummy { }
 
-  private static class TestDummyCoder extends CustomCoder<TestDummy> {
+  private static class TestDummyCoder extends AtomicCoder<TestDummy> {
     private TestDummyCoder() { }
     private static final TestDummyCoder INSTANCE = new TestDummyCoder();
 
@@ -1085,7 +1085,7 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  private static class MyIntegerCoder extends CustomCoder<MyInteger> {
+  private static class MyIntegerCoder extends AtomicCoder<MyInteger> {
     private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
 
     private final VarIntCoder delegate = VarIntCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index e72c540..84f3d69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -39,9 +39,9 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -505,7 +505,7 @@ public class ViewTest implements Serializable {
     pipeline.run();
   }
 
-  private static class NonDeterministicStringCoder extends CustomCoder<String> {
+  private static class NonDeterministicStringCoder extends AtomicCoder<String> {
     @Override
     public void encode(String value, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 19b6092..f26dd59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -35,8 +35,8 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -270,7 +270,7 @@ public class DoFnInvokersTest {
   private abstract static class SomeRestrictionTracker
       implements RestrictionTracker<SomeRestriction> {}
 
-  private static class SomeRestrictionCoder extends CustomCoder<SomeRestriction> {
+  private static class SomeRestrictionCoder extends AtomicCoder<SomeRestriction> {
     public static SomeRestrictionCoder of() {
       return new SomeRestrictionCoder();
     }
@@ -392,7 +392,7 @@ public class DoFnInvokersTest {
     public void checkDone() throws IllegalStateException {}
   }
 
-  private static class CoderForDefaultTracker extends CustomCoder<RestrictionWithDefaultTracker> {
+  private static class CoderForDefaultTracker extends AtomicCoder<RestrictionWithDefaultTracker> {
     public static CoderForDefaultTracker of() {
       return new CoderForDefaultTracker();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 0db5355..7230a8b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -24,10 +24,10 @@ import static org.mockito.Mockito.mock;
 
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.testing.CoderPropertiesTest.ClosingCoder;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,7 +44,7 @@ public class CoderUtilsTest {
   @Rule
   public transient ExpectedException expectedException = ExpectedException.none();
 
-  static class TestCoder extends CustomCoder<Integer> {
+  static class TestCoder extends AtomicCoder<Integer> {
     public static TestCoder of() {
       return new TestCoder();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 2d5baf2..6ba1d4a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -25,9 +25,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -85,7 +85,7 @@ public class SerializableUtilsTest {
   }
 
   /** A {@link Coder} that is not serializable by Java. */
-  private static class UnserializableCoderByJava extends CustomCoder<Object> {
+  private static class UnserializableCoderByJava extends AtomicCoder<Object> {
     private final Object unserializableField = new Object();
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index d4e6f63..0781cf1 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -22,9 +22,9 @@ import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * <p>When this code is used in a nested {@link Coder.Context}, the serialized {@link ByteString}
  * objects are first delimited by their size.
  */
-public class ByteStringCoder extends CustomCoder<ByteString> {
+public class ByteStringCoder extends AtomicCoder<ByteString> {
 
   public static ByteStringCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
index 26b4b56..f04e9b9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -25,7 +25,7 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 
 
@@ -34,11 +34,14 @@ import org.apache.beam.sdk.coders.VarIntCoder;
  */
 @VisibleForTesting
 class ShardedKeyCoder<KeyT>
-    extends CustomCoder<ShardedKey<KeyT>> {
+    extends StructuredCoder<ShardedKey<KeyT>> {
   public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
     return new ShardedKeyCoder<>(keyCoder);
   }
 
+  private final Coder<KeyT> keyCoder;
+  private final VarIntCoder shardNumberCoder;
+
   protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
     this.keyCoder = keyCoder;
     this.shardNumberCoder = VarIntCoder.of();
@@ -68,7 +71,4 @@ class ShardedKeyCoder<KeyT>
   public void verifyDeterministic() throws NonDeterministicException {
     keyCoder.verifyDeterministic();
   }
-
-  Coder<KeyT> keyCoder;
-  VarIntCoder shardNumberCoder;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 8a06d13..3059e2a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -21,12 +21,12 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /** A coder for {@link TableDestination} objects. */
-public class TableDestinationCoder extends CustomCoder<TableDestination> {
+public class TableDestinationCoder extends AtomicCoder<TableDestination> {
   private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
   private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index c3e48a4..2b1988a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -22,15 +22,15 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /**
  * Defines a coder for {@link TableRowInfo} objects.
  */
 @VisibleForTesting
-class TableRowInfoCoder extends CustomCoder<TableRowInfo> {
+class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
   private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
 
   public static TableRowInfoCoder of() {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index a1ca41b..7ca8958 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -23,15 +23,15 @@ import com.google.api.services.bigquery.model.TableRow;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
  */
-public class TableRowJsonCoder extends CustomCoder<TableRow> {
+public class TableRowJsonCoder extends AtomicCoder<TableRow> {
 
   public static TableRowJsonCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 70aa135..b896083 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -26,12 +26,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -71,7 +73,7 @@ class WriteBundlesToFiles<DestinationT>
   }
 
   /** a coder for the {@link Result} class. */
-  public static class ResultCoder<DestinationT> extends CustomCoder<Result<DestinationT>> {
+  public static class ResultCoder<DestinationT> extends StructuredCoder<Result<DestinationT>> {
     private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
     private static final VarLongCoder longCoder = VarLongCoder.of();
     private final Coder<DestinationT> destinationCoder;
@@ -105,6 +107,11 @@ class WriteBundlesToFiles<DestinationT>
     }
 
     @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.singletonList(destinationCoder);
+    }
+
+    @Override
     public void verifyDeterministic() {}
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 031d9a0..9f04a6c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -31,11 +31,11 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
@@ -100,7 +100,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
   /**
    * Coder for conveying outgoing messages between internal stages.
    */
-  private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
+  private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
     private static final NullableCoder<String> RECORD_ID_CODER =
         NullableCoder.of(StringUtf8Coder.of());
     private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index c2cbe73..c16b8fb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -118,7 +118,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
   /**
    * Coder for checkpoints.
    */
-  private static final PubsubCheckpointCoder CHECKPOINT_CODER = new PubsubCheckpointCoder();
+  private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = PubsubCheckpointCoder.of();
 
   /**
    * Maximum number of messages per pull.
@@ -357,11 +357,17 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
   }
 
   /** The coder for our checkpoints. */
-  private static class PubsubCheckpointCoder extends CustomCoder<PubsubCheckpoint> {
+  private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint> {
     private static final Coder<String> SUBSCRIPTION_PATH_CODER =
         NullableCoder.of(StringUtf8Coder.of());
     private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
 
+    public static <T> PubsubCheckpointCoder<T> of() {
+      return new PubsubCheckpointCoder<>();
+    }
+
+    private PubsubCheckpointCoder() {}
+
     @Override
     public void encode(PubsubCheckpoint value, OutputStream outStream, Context context)
         throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index a3b21ee..6c15f87 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -73,10 +73,10 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -720,7 +720,7 @@ public class BigQueryIOTest implements Serializable {
   /**
    * Coder for @link{PartitionedGlobalWindow}.
    */
-  private static class PartitionedGlobalWindowCoder extends CustomCoder<PartitionedGlobalWindow> {
+  private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
     @Override
     public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 9589fb1..15877b0 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -92,4 +93,21 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> {
         "Hadoop Writable may be non-deterministic.");
   }
 
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (!(other instanceof WritableCoder)) {
+      return false;
+    }
+    WritableCoder<?> that = (WritableCoder<?>) other;
+    return Objects.equals(this.type, that.type);
+  }
+
+  @Override
+  public int hashCode() {
+    return type.hashCode();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 35a8863..7cc043c 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut
  * A {@link Coder} that serializes and deserializes the {@link Mutation} objects using {@link
  * ProtobufUtil}.
  */
-class HBaseMutationCoder extends CustomCoder<Mutation> implements Serializable {
+class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
   private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder();
 
   private HBaseMutationCoder() {}

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 0004d03..24a5f7f 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
  * A {@link Coder} that serializes and deserializes the {@link Result} objects using {@link
  * ProtobufUtil}.
  */
-class HBaseResultCoder extends CustomCoder<Result> implements Serializable {
+class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
   private static final HBaseResultCoder INSTANCE = new HBaseResultCoder();
 
   private HBaseResultCoder() {}

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index f4de76a..e676455 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -55,11 +55,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.io.Read.Unbounded;
@@ -1595,11 +1595,11 @@ public class KafkaIO {
     }
   }
 
-  private static class NullOnlyCoder<T> extends CustomCoder<T> {
+  private static class NullOnlyCoder<T> extends AtomicCoder<T> {
     @Override
     public void encode(T value, OutputStream outStream, Context context) {
       checkArgument(value == null, "Can only encode nulls");
-      // Encode as the empty string.
+      // Encode as no bytes.
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 160e8ce..0f5dc4c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -23,9 +23,9 @@ import java.io.OutputStream;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.values.KV;
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.KV;
 /**
  * {@link Coder} for {@link KafkaRecord}.
  */
-public class KafkaRecordCoder<K, V> extends CustomCoder<KafkaRecord<K, V>> {
+public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
 
   private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
   private static final VarLongCoder longCoder = VarLongCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index 4da2b05..77fe127 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -32,7 +32,7 @@ import org.joda.time.Instant;
 /**
  * A {@link Coder} for {@link KinesisRecord}.
  */
-class KinesisRecordCoder extends CustomCoder<KinesisRecord> {
+class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
     private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
     private static final InstantCoder INSTANT_CODER = InstantCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index 812bc70..75a4619 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -23,6 +23,7 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Objects;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
@@ -139,6 +140,23 @@ public class JAXBCoder<T> extends CustomCoder<T> {
     return TypeDescriptor.of(jaxbClass);
   }
 
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (!(other instanceof JAXBCoder)) {
+      return false;
+    }
+    JAXBCoder<?> that = (JAXBCoder<?>) other;
+    return Objects.equals(this.jaxbClass, that.jaxbClass);
+  }
+
+  @Override
+  public int hashCode() {
+    return jaxbClass.hashCode();
+  }
+
   private static class CloseIgnoringInputStream extends FilterInputStream {
 
     protected CloseIgnoringInputStream(InputStream in) {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
index 940d596..2b4503a 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.xml.bind.annotation.XmlRootElement;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.CoderProperties;
@@ -171,7 +171,7 @@ public class JAXBCoderTest {
   /**
    * A coder that surrounds the value with two values, to demonstrate nesting.
    */
-  private static class TestCoder extends CustomCoder<TestType> {
+  private static class TestCoder extends StructuredCoder<TestType> {
     private final JAXBCoder<TestType> jaxbCoder;
     public TestCoder(JAXBCoder<TestType> jaxbCoder) {
       this.jaxbCoder = jaxbCoder;


[6/6] beam git commit: Add default implementations of Coder methods to Coder

Posted by tg...@apache.org.
Add default implementations of Coder methods to Coder

Remove from StructuredCoder. These are sensible defaults implemented in
terms of other Coder methods.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63258c69
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63258c69
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63258c69

Branch: refs/heads/master
Commit: 63258c6986866b5bef58043b056d5c0dfec7303f
Parents: 655947b
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:10:40 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/coders/Coder.java  | 121 ++++++++++++++++---
 .../apache/beam/sdk/coders/StructuredCoder.java |  67 ++--------
 2 files changed, 108 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/63258c69/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 061e9e5..41e83ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -22,6 +22,9 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -206,6 +209,30 @@ public abstract class Coder<T> implements Serializable {
   public abstract void verifyDeterministic() throws Coder.NonDeterministicException;
 
   /**
+   * Verifies all of the provided coders are deterministic. If any are not, throws a {@link
+   * NonDeterministicException} for the {@code target} {@link Coder}.
+   */
+  public static void verifyDeterministic(Coder<?> target, String message, Iterable<Coder<?>> coders)
+      throws NonDeterministicException {
+    for (Coder<?> coder : coders) {
+      try {
+        coder.verifyDeterministic();
+      } catch (NonDeterministicException e) {
+        throw new NonDeterministicException(target, message, e);
+      }
+    }
+  }
+
+  /**
+   * Verifies all of the provided coders are deterministic. If any are not, throws a {@link
+   * NonDeterministicException} for the {@code target} {@link Coder}.
+   */
+  public static void verifyDeterministic(Coder<?> target, String message, Coder<?>... coders)
+      throws NonDeterministicException {
+    verifyDeterministic(target, message, Arrays.asList(coders));
+  }
+
+  /**
    * Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}.
    *
    * <p>Whenever the encoded bytes of two values are equal, then the original values are equal
@@ -214,28 +241,50 @@ public abstract class Coder<T> implements Serializable {
    * <p>This condition is most notably false for arrays. More generally, this condition is false
    * whenever {@code equals()} compares object identity, rather than performing a
    * semantic/structural comparison.
+   *
+   * <p>By default, returns false.
    */
-  public abstract boolean consistentWithEquals();
+  public boolean consistentWithEquals() {
+    return false;
+  }
 
   /**
-   * Returns an object with an {@code Object.equals()} method that represents structural equality
-   * on the argument.
+   * Returns an object with an {@code Object.equals()} method that represents structural equality on
+   * the argument.
    *
    * <p>For any two values {@code x} and {@code y} of type {@code T}, if their encoded bytes are the
    * same, then it must be the case that {@code structuralValue(x).equals(@code structuralValue(y)}.
    *
    * <p>Most notably:
+   *
    * <ul>
    *   <li>The structural value for an array coder should perform a structural comparison of the
-   *   contents of the arrays, rather than the default behavior of comparing according to object
-   *   identity.
-   *   <li>The structural value for a coder accepting {@code null} should be a proper object with
-   *   an {@code equals()} method, even if the input value is {@code null}.
+   *       contents of the arrays, rather than the default behavior of comparing according to object
+   *       identity.
+   *   <li>The structural value for a coder accepting {@code null} should be a proper object with an
+   *       {@code equals()} method, even if the input value is {@code null}.
    * </ul>
    *
    * <p>See also {@link #consistentWithEquals()}.
+   *
+   * <p>By default, if this coder is {@link #consistentWithEquals()}, and the value is not null,
+   * returns the provided object. Otherwise, encodes the value into a {@code byte[]}, and returns
+   * an object that performs array equality on the encoded bytes.
    */
-  public abstract Object structuralValue(T value);
+  public Object structuralValue(T value) {
+    if (value != null && consistentWithEquals()) {
+      return value;
+    } else {
+      try {
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        encode(value, os, Context.OUTER);
+        return new StructuralByteArray(os.toByteArray());
+      } catch (Exception exn) {
+        throw new IllegalArgumentException(
+            "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+      }
+    }
+  }
 
   /**
    * Returns whether {@link #registerByteSizeObserver} cheap enough to
@@ -246,21 +295,44 @@ public abstract class Coder<T> implements Serializable {
    * <p>Not intended to be called by user code, but instead by
    * {@link PipelineRunner}
    * implementations.
+   *
+   * <p>By default, returns false. The default {@link #registerByteSizeObserver} implementation
+   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
+   *         unless it is overridden. This is considered expensive.
    */
-  public abstract boolean isRegisterByteSizeObserverCheap(T value);
+  public boolean isRegisterByteSizeObserverCheap(T value) {
+    return isRegisterByteSizeObserverCheap(value, Context.NESTED);
+  }
 
   /**
-   * Returns whether {@link #registerByteSizeObserver} cheap enough to
-   * call for every element, that is, if this {@code Coder} can
-   * calculate the byte size of the element to be coded in roughly
-   * constant time (or lazily).
+   * {@inheritDoc}
    *
    * <p>Not intended to be called by user code, but instead by
    * {@link PipelineRunner}
    * implementations.
+   *
+   * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
+   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
+   *         unless it is overridden. This is considered expensive.
    */
   @Deprecated
-  public abstract boolean isRegisterByteSizeObserverCheap(T value, Context context);
+  public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
+    return false;
+  }
+
+  /**
+   * Returns the size in bytes of the encoded value using this coder.
+   */
+  protected long getEncodedElementByteSize(T value, Context context)
+      throws Exception {
+    try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
+      encode(value, os, context);
+      return os.getCount();
+    } catch (Exception exn) {
+      throw new IllegalArgumentException(
+          "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+    }
+  }
 
   /**
    * Notifies the {@code ElementByteSizeObserver} about the byte size
@@ -269,10 +341,14 @@ public abstract class Coder<T> implements Serializable {
    * <p>Not intended to be called by user code, but instead by
    * {@link PipelineRunner}
    * implementations.
+   *
+   * <p>By default, this notifies {@code observer} about the byte size
+   * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
    */
-  public abstract void registerByteSizeObserver(
-      T value, ElementByteSizeObserver observer)
-      throws Exception;
+  public void registerByteSizeObserver(T value, ElementByteSizeObserver observer)
+      throws Exception {
+    registerByteSizeObserver(value, observer, Context.NESTED);
+  }
 
   /**
    * Notifies the {@code ElementByteSizeObserver} about the byte size
@@ -283,15 +359,20 @@ public abstract class Coder<T> implements Serializable {
    * implementations.
    */
   @Deprecated
-  public abstract void registerByteSizeObserver(
+  public void registerByteSizeObserver(
       T value, ElementByteSizeObserver observer, Context context)
-      throws Exception;
+      throws Exception {
+    observer.update(getEncodedElementByteSize(value, context));
+  }
 
   /**
    * Returns the {@link TypeDescriptor} for the type encoded.
    */
   @Experimental(Kind.CODER_TYPE_ENCODING)
-  public abstract TypeDescriptor<T> getEncodedTypeDescriptor();
+  public TypeDescriptor<T> getEncodedTypeDescriptor(){
+    return (TypeDescriptor<T>)
+        TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType());
+  }
 
   /**
    * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is

http://git-wip-us.apache.org/repos/asf/beam/blob/63258c69/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
index 0c72618..437f10d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,13 +24,15 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing
  * via the class name and recursively using {@link #getComponents}.
  *
+ * <p>A {@link StructuredCoder} should be defined purely in terms of its component coders, and
+ * contain no additional configuration.
+ *
  * <p>To extend {@link StructuredCoder}, override the following methods as appropriate:
  *
  * <ul>
@@ -101,12 +101,14 @@ public abstract class StructuredCoder<T> extends Coder<T> {
     return builder.toString();
   }
 
+  @Override
   public void encode(T value, OutputStream outStream)
       throws CoderException, IOException {
     encode(value, outStream, Coder.Context.NESTED);
   }
 
   @Deprecated
+  @Override
   public void encodeOuter(T value, OutputStream outStream)
       throws CoderException, IOException {
     encode(value, outStream, Coder.Context.OUTER);
@@ -122,11 +124,13 @@ public abstract class StructuredCoder<T> extends Coder<T> {
     }
   }
 
+  @Override
   public T decode(InputStream inStream) throws CoderException, IOException {
     return decode(inStream, Coder.Context.NESTED);
   }
 
   @Deprecated
+  @Override
   public T decodeOuter(InputStream inStream) throws CoderException, IOException {
     return decode(inStream, Coder.Context.OUTER);
   }
@@ -141,63 +145,6 @@ public abstract class StructuredCoder<T> extends Coder<T> {
     }
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
-   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
-   *         unless it is overridden. This is considered expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(T value) {
-    return isRegisterByteSizeObserverCheap(value, Context.NESTED);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
-   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
-   *         unless it is overridden. This is considered expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
-    return false;
-  }
-
-  /**
-   * Returns the size in bytes of the encoded value using this coder.
-   */
-  protected long getEncodedElementByteSize(T value, Context context)
-      throws Exception {
-    try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
-      encode(value, os, context);
-      return os.getCount();
-    } catch (Exception exn) {
-      throw new IllegalArgumentException(
-          "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
-    }
-  }
-
-  @Override
-  public void registerByteSizeObserver(T value, ElementByteSizeObserver observer)
-      throws Exception {
-    registerByteSizeObserver(value, observer, Context.NESTED);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>For {@link StructuredCoder} subclasses, this notifies {@code observer} about the byte size
-   * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      T value, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    observer.update(getEncodedElementByteSize(value, context));
-  }
-
   protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
       throws NonDeterministicException {
     for (Coder<?> coder : coders) {


[5/6] beam git commit: Make CustomCoder extend Coder directly

Posted by tg...@apache.org.
Make CustomCoder extend Coder directly

CustomCoder in general consists of configuration beyond component
coders, and should not extend, for example, equality methods of
StrucutredCoder.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c7f3e3c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c7f3e3c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c7f3e3c3

Branch: refs/heads/master
Commit: c7f3e3c3a00423dfcaca6e5b53ad4f84533a56f6
Parents: 987c2cb
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:13:41 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/CustomCoder.java | 50 +++++++++++++++++++-
 1 file changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c7f3e3c3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index 87bd531..f33e210 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
@@ -33,8 +36,53 @@ import java.util.List;
  *
  * @param <T> the type of elements handled by this coder
  */
-public abstract class CustomCoder<T> extends StructuredCoder<T>
+public abstract class CustomCoder<T> extends Coder<T>
     implements Serializable {
+
+  @Override
+  public void encode(T value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Coder.Context.NESTED);
+  }
+
+  @Deprecated
+  @Override
+  public void encodeOuter(T value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Coder.Context.OUTER);
+  }
+
+  @Deprecated
+  public void encode(T value, OutputStream outStream, Coder.Context context)
+      throws CoderException, IOException {
+    if (context == Coder.Context.NESTED) {
+      encode(value, outStream);
+    } else {
+      encodeOuter(value, outStream);
+    }
+  }
+
+  @Override
+  public T decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Coder.Context.NESTED);
+  }
+
+  @Deprecated
+  @Override
+  public T decodeOuter(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Coder.Context.OUTER);
+  }
+
+  @Deprecated
+  public T decode(InputStream inStream, Coder.Context context)
+      throws CoderException, IOException {
+    if (context == Coder.Context.NESTED) {
+      return decode(inStream);
+    } else {
+      return decodeOuter(inStream);
+    }
+  }
+
   /**
    * {@inheritDoc}.
    *