You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/05 20:35:23 UTC

[3/6] beam git commit: Reparent many Coders to Atomic or StructuredCoder

Reparent many Coders to Atomic or StructuredCoder

These coders do not take configuration, or take configuration only in
terms of other Coders, and are appropriate to reparent.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/987c2cbc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/987c2cbc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/987c2cbc

Branch: refs/heads/master
Commit: 987c2cbca433f3e0ff12a637f2b0474e772c6beb
Parents: 63258c6
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 5 10:13:05 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 5 13:30:51 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/ApexStreamTuple.java |  5 ++-
 .../UnboundedReadFromBoundedSource.java         |  4 +-
 .../runners/core/construction/CodersTest.java   | 14 +------
 .../core/construction/PCollectionsTest.java     |  3 +-
 .../core/ElementAndRestrictionCoder.java        | 17 ++++++++-
 .../beam/runners/core/KeyedWorkItemCoder.java   |  4 +-
 .../beam/runners/core/TimerInternals.java       |  6 +--
 .../direct/CloningBundleFactoryTest.java        | 10 ++---
 .../beam/runners/direct/DirectRunnerTest.java   |  5 +--
 .../UnboundedReadEvaluatorFactoryTest.java      |  4 +-
 .../streaming/SingletonKeyedWorkItemCoder.java  |  4 +-
 .../runners/dataflow/BatchViewOverrides.java    |  8 ++--
 .../runners/dataflow/internal/IsmFormat.java    | 40 ++++++++++++++++----
 .../runners/dataflow/util/RandomAccessData.java |  4 +-
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 19 ++++++++++
 .../apache/beam/sdk/coders/BigDecimalCoder.java |  2 +-
 .../beam/sdk/coders/BigEndianIntegerCoder.java  |  2 +-
 .../beam/sdk/coders/BigEndianLongCoder.java     |  4 +-
 .../apache/beam/sdk/coders/BigIntegerCoder.java |  4 +-
 .../org/apache/beam/sdk/coders/BitSetCoder.java |  4 +-
 .../apache/beam/sdk/coders/ByteArrayCoder.java  |  6 +--
 .../org/apache/beam/sdk/coders/ByteCoder.java   |  2 +-
 .../apache/beam/sdk/coders/CoderFactories.java  |  9 +++--
 .../org/apache/beam/sdk/coders/DoubleCoder.java |  2 +-
 .../apache/beam/sdk/coders/DurationCoder.java   |  2 +-
 .../apache/beam/sdk/coders/InstantCoder.java    |  2 +-
 .../org/apache/beam/sdk/coders/KvCoder.java     |  4 +-
 .../org/apache/beam/sdk/coders/ListCoder.java   |  3 +-
 .../org/apache/beam/sdk/coders/MapCoder.java    |  2 +-
 .../apache/beam/sdk/coders/NullableCoder.java   |  6 +--
 .../apache/beam/sdk/coders/StringUtf8Coder.java |  2 +-
 .../beam/sdk/coders/TextualIntegerCoder.java    |  2 +-
 .../org/apache/beam/sdk/coders/VarIntCoder.java |  2 +-
 .../org/apache/beam/sdk/coders/VoidCoder.java   |  4 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  4 +-
 .../sdk/transforms/ApproximateQuantiles.java    | 26 ++++++++++---
 .../org/apache/beam/sdk/transforms/Combine.java | 12 ++++--
 .../apache/beam/sdk/transforms/CombineFns.java  |  4 +-
 .../org/apache/beam/sdk/transforms/Count.java   |  4 +-
 .../org/apache/beam/sdk/transforms/Mean.java    |  7 ++--
 .../org/apache/beam/sdk/transforms/Top.java     | 23 ++++++++++-
 .../beam/sdk/transforms/join/CoGbkResult.java   |  2 +-
 .../beam/sdk/transforms/join/UnionCoder.java    |  7 ++--
 .../beam/sdk/transforms/windowing/PaneInfo.java | 10 ++++-
 .../org/apache/beam/sdk/util/BitSetCoder.java   |  9 +++--
 .../org/apache/beam/sdk/util/WindowedValue.java |  7 ++--
 .../beam/sdk/values/TimestampedValue.java       | 13 +++++--
 .../beam/sdk/values/ValueInSingleWindow.java    |  4 +-
 .../beam/sdk/values/ValueWithRecordId.java      |  4 +-
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +-
 .../beam/sdk/coders/DelegateCoderTest.java      | 25 ------------
 .../beam/sdk/coders/NullableCoderTest.java      |  2 +-
 .../beam/sdk/testing/CoderPropertiesTest.java   | 37 ++++++++++++++++--
 .../apache/beam/sdk/testing/PAssertTest.java    |  4 +-
 .../sdk/testing/SerializableMatchersTest.java   |  4 +-
 .../beam/sdk/testing/WindowSupplierTest.java    |  6 +--
 .../beam/sdk/transforms/CombineFnsTest.java     |  4 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  6 +--
 .../apache/beam/sdk/transforms/CreateTest.java  |  6 +--
 .../beam/sdk/transforms/GroupByKeyTest.java     |  4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 +--
 .../apache/beam/sdk/transforms/ViewTest.java    |  4 +-
 .../transforms/reflect/DoFnInvokersTest.java    |  6 +--
 .../apache/beam/sdk/util/CoderUtilsTest.java    |  4 +-
 .../beam/sdk/util/SerializableUtilsTest.java    |  4 +-
 .../extensions/protobuf/ByteStringCoder.java    |  4 +-
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    | 10 ++---
 .../io/gcp/bigquery/TableDestinationCoder.java  |  4 +-
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  |  4 +-
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  |  4 +-
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 11 +++++-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  4 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 12 ++++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  4 +-
 .../beam/sdk/io/hadoop/WritableCoder.java       | 18 +++++++++
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |  4 +-
 .../beam/sdk/io/hbase/HBaseResultCoder.java     |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  6 +--
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  4 +-
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  4 +-
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   | 18 +++++++++
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   |  4 +-
 82 files changed, 369 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
index 4ce351b..4aa6ee8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -31,7 +31,7 @@ import java.util.Objects;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 
 /**
  * The common interface for all objects transmitted through streams.
@@ -149,7 +149,7 @@ public interface ApexStreamTuple<T> {
   /**
    * Coder for {@link ApexStreamTuple}.
    */
-  class ApexStreamTupleCoder<T> extends CustomCoder<ApexStreamTuple<T>> {
+  class ApexStreamTupleCoder<T> extends StructuredCoder<ApexStreamTuple<T>> {
     private static final long serialVersionUID = 1L;
     final Coder<T> valueCoder;
 
@@ -194,6 +194,7 @@ public interface ApexStreamTuple<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
+          this,
           this.getClass().getSimpleName() + " requires a deterministic valueCoder",
           valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 0ea13b8..1424b8b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -35,10 +35,10 @@ import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.Read;
@@ -203,7 +203,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @VisibleForTesting
-    static class CheckpointCoder<T> extends CustomCoder<Checkpoint<T>> {
+    static class CheckpointCoder<T> extends StructuredCoder<Checkpoint<T>> {
 
       // The coder for a list of residual elements and their timestamps
       private final Coder<List<TimestampedValue<T>>> elemsCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index 32a78fa..765723c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -30,11 +30,11 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
@@ -149,7 +149,7 @@ public class CodersTest {
 
     static class Record implements Serializable {}
 
-    private static class RecordCoder extends CustomCoder<Record> {
+    private static class RecordCoder extends AtomicCoder<Record> {
       @Override
       public void encode(Record value, OutputStream outStream, Context context)
           throws CoderException, IOException {}
@@ -159,16 +159,6 @@ public class CodersTest {
           throws CoderException, IOException {
         return new Record();
       }
-
-      @Override
-      public boolean equals(Object other) {
-        return other != null && getClass().equals(other.getClass());
-      }
-
-      @Override
-      public int hashCode() {
-        return getClass().hashCode();
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
index c177c58..2c45cbd 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -158,7 +159,7 @@ public class PCollectionsTest {
 
     @Override
     public Coder<BoundedWindow> windowCoder() {
-      return new CustomCoder<BoundedWindow>() {
+      return new AtomicCoder<BoundedWindow>() {
         @Override public void verifyDeterministic() {}
 
         @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 64c1e14..83c4e62 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -17,18 +17,20 @@
  */
 package org.apache.beam.runners.core;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 
 /** A {@link Coder} for {@link ElementAndRestriction}. */
 @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
 public class ElementAndRestrictionCoder<ElementT, RestrictionT>
-    extends CustomCoder<ElementAndRestriction<ElementT, RestrictionT>> {
+    extends StructuredCoder<ElementAndRestriction<ElementT, RestrictionT>> {
   private final Coder<ElementT> elementCoder;
   private final Coder<RestrictionT> restrictionCoder;
 
@@ -65,6 +67,17 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
     return ElementAndRestriction.of(key, value);
   }
 
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(elementCoder, restrictionCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    elementCoder.verifyDeterministic();
+    restrictionCoder.verifyDeterministic();
+  }
+
   public Coder<ElementT> getElementCoder() {
     return elementCoder;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index fddf7fa..e1872b5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -26,8 +26,8 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 /**
  * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
  */
-public class KeyedWorkItemCoder<K, ElemT> extends CustomCoder<KeyedWorkItem<K, ElemT>> {
+public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 21fe430..888c11f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -27,9 +27,9 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -224,7 +224,7 @@ public interface TimerInternals {
   /**
    * A {@link Coder} for {@link TimerData}.
    */
-  class TimerDataCoder extends CustomCoder<TimerData> {
+  class TimerDataCoder extends StructuredCoder<TimerData> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
     private static final InstantCoder INSTANT_CODER = InstantCoder.of();
     private final Coder<? extends BoundedWindow> windowCoder;
@@ -266,7 +266,7 @@ public interface TimerInternals {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic("window coder must be deterministic", windowCoder);
+      verifyDeterministic(this, "window coder must be deterministic", windowCoder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 7d037d1..33d171e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -173,7 +173,7 @@ public class CloningBundleFactoryTest {
   }
 
   static class Record {}
-  static class RecordNoEncodeCoder extends CustomCoder<Record> {
+  static class RecordNoEncodeCoder extends AtomicCoder<Record> {
 
     @Override
     public void encode(
@@ -192,7 +192,7 @@ public class CloningBundleFactoryTest {
     }
   }
 
-  static class RecordNoDecodeCoder extends CustomCoder<Record> {
+  static class RecordNoDecodeCoder extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,
@@ -208,7 +208,7 @@ public class CloningBundleFactoryTest {
     }
   }
 
-  private static class RecordStructuralValueCoder extends CustomCoder<Record> {
+  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,
@@ -240,7 +240,7 @@ public class CloningBundleFactoryTest {
   }
 
   private static class RecordNotConsistentWithEqualsStructuralValueCoder
-      extends CustomCoder<Record> {
+      extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 428c6fc..0fe9585 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -44,9 +44,9 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -523,8 +523,7 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
-  private static class LongNoDecodeCoder extends CustomCoder<Long> {
-
+  private static class LongNoDecodeCoder extends AtomicCoder<Long> {
     @Override
     public void encode(
         Long value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index ceb078b..b9ba7f4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -46,10 +46,10 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
 import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
@@ -586,7 +586,7 @@ public class UnboundedReadEvaluatorFactoryTest {
       return finalized;
     }
 
-    public static class Coder extends CustomCoder<TestCheckpointMark> {
+    public static class Coder extends AtomicCoder<TestCheckpointMark> {
       @Override
       public void encode(
           TestCheckpointMark value,

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index c73700f..f218693 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -26,7 +26,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.util.WindowedValue;
  * Singleton keyed work item coder.
  */
 public class SingletonKeyedWorkItemCoder<K, ElemT>
-    extends CustomCoder<SingletonKeyedWorkItem<K, ElemT>> {
+    extends StructuredCoder<SingletonKeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ef2bfed..ecd0365 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -50,11 +50,11 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -1335,7 +1335,7 @@ class BatchViewOverrides {
    * A {@link Coder} for {@link TransformedMap}s.
    */
   static class TransformedMapCoder<K, V1, V2>
-      extends CustomCoder<TransformedMap<K, V1, V2>> {
+      extends StructuredCoder<TransformedMap<K, V1, V2>> {
     private final Coder<Function<V1, V2>> transformCoder;
     private final Coder<Map<K, V1>> originalMapCoder;
 
@@ -1373,8 +1373,8 @@ class BatchViewOverrides {
     @Override
     public void verifyDeterministic()
         throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-      verifyDeterministic("Expected transform coder to be deterministic.", transformCoder);
-      verifyDeterministic("Expected map coder to be deterministic.", originalMapCoder);
+      verifyDeterministic(this, "Expected transform coder to be deterministic.", transformCoder);
+      verifyDeterministic(this, "Expected map coder to be deterministic.", originalMapCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index fbfe49a..aed514a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -32,14 +32,17 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.util.VarInt;
@@ -356,8 +359,9 @@ public class IsmFormat {
 
     @Override
     public void verifyDeterministic() throws Coder.NonDeterministicException {
-      verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
-      verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
+      verifyDeterministic(
+          this, "Key component coders expected to be deterministic.", keyComponentCoders);
+      verifyDeterministic(this, "Value coder expected to be deterministic.", valueCoder);
     }
 
     @Override
@@ -393,6 +397,28 @@ public class IsmFormat {
       }
       return super.structuralValue(record);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof IsmRecordCoder)) {
+        return false;
+      }
+      IsmRecordCoder<?> that = (IsmRecordCoder<?>) other;
+      return Objects.equals(this.numberOfShardKeyCoders, that.numberOfShardKeyCoders)
+          && Objects.equals(
+              this.numberOfMetadataShardKeyCoders, that.numberOfMetadataShardKeyCoders)
+          && Objects.equals(this.keyComponentCoders, that.keyComponentCoders)
+          && Objects.equals(this.valueCoder, that.valueCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          numberOfShardKeyCoders, numberOfMetadataShardKeyCoders, keyComponentCoders, valueCoder);
+    }
   }
 
   /**
@@ -450,7 +476,7 @@ public class IsmFormat {
    * A coder for metadata key component. Can be used to wrap key component coder allowing for
    * the metadata key component to be used as a place holder instead of an actual key.
    */
-  public static class MetadataKeyCoder<K> extends CustomCoder<K> {
+  public static class MetadataKeyCoder<K> extends StructuredCoder<K> {
     public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
       checkNotNull(keyCoder);
       return new MetadataKeyCoder<>(keyCoder);
@@ -497,7 +523,7 @@ public class IsmFormat {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic("Expected key coder to be deterministic", keyCoder);
+      verifyDeterministic(this, "Expected key coder to be deterministic", keyCoder);
     }
   }
 
@@ -584,7 +610,7 @@ public class IsmFormat {
    *   <li>indexOffset (variable length long encoding)</li>
    * </ul>
    */
-  public static class IsmShardCoder extends CustomCoder<IsmShard> {
+  public static class IsmShardCoder extends AtomicCoder<IsmShard> {
     private static final IsmShardCoder INSTANCE = new IsmShardCoder();
 
     /** Returns an IsmShardCoder. */
@@ -649,7 +675,7 @@ public class IsmFormat {
   }
 
   /** A {@link Coder} for {@link KeyPrefix}. */
-  public static final class KeyPrefixCoder extends CustomCoder<KeyPrefix> {
+  public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
     private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
 
     public static KeyPrefixCoder of() {
@@ -721,7 +747,7 @@ public class IsmFormat {
   }
 
   /** A {@link Coder} for {@link Footer}. */
-  public static final class FooterCoder extends CustomCoder<Footer> {
+  public static final class FooterCoder extends AtomicCoder<Footer> {
     private static final FooterCoder INSTANCE = new FooterCoder();
 
     public static FooterCoder of() {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index 66548e2..4e94515 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -30,10 +30,10 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Comparator;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.VarInt;
 
 /**
@@ -55,7 +55,7 @@ public class RandomAccessData {
    *
    * <p>This coder does not support encoding positive infinity.
    */
-  public static class RandomAccessDataCoder extends CustomCoder<RandomAccessData> {
+  public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
     private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
 
     public static RandomAccessDataCoder of() {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 1e01f1a..2aa2b44 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -681,4 +682,22 @@ public class AvroCoder<T> extends CustomCoder<T> {
       throw new IllegalArgumentException("Unable to get field " + name + " from " + originalClazz);
     }
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (!(other instanceof AvroCoder)) {
+      return false;
+    }
+    AvroCoder<?> that = (AvroCoder<?>) other;
+    return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
+        && Objects.equals(this.typeDescriptor, that.typeDescriptor);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(schemaSupplier.get(), typeDescriptor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index d628203..aadf085 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -32,7 +32,7 @@ import java.math.MathContext;
  * {@link BigInteger}, when scaled (with unlimited precision, aka {@link MathContext#UNLIMITED}),
  * yields the expected {@link BigDecimal}.
  */
-public class BigDecimalCoder extends CustomCoder<BigDecimal> {
+public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
 
   public static BigDecimalCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index 81c5e94..c3c7a96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link BigEndianIntegerCoder} encodes {@link Integer Integers} in 4 bytes, big-endian.
  */
-public class BigEndianIntegerCoder extends CustomCoder<Integer> {
+public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
 
   public static BigEndianIntegerCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 173e910..5ef4878 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -30,9 +29,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link BigEndianLongCoder} encodes {@link Long}s in 8 bytes, big-endian.
  */
-public class BigEndianLongCoder extends CustomCoder<Long> {
+public class BigEndianLongCoder extends AtomicCoder<Long> {
 
-  @JsonCreator
   public static BigEndianLongCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index a739da7..6d14d17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -28,7 +28,7 @@ import java.math.BigInteger;
  * A {@link BigIntegerCoder} encodes a {@link BigInteger} as a byte array containing the big endian
  * two's-complement representation, encoded via {@link ByteArrayCoder}.
  */
-public class BigIntegerCoder extends CustomCoder<BigInteger> {
+public class BigIntegerCoder extends AtomicCoder<BigInteger> {
 
   public static BigIntegerCoder of() {
     return INSTANCE;
@@ -55,7 +55,7 @@ public class BigIntegerCoder extends CustomCoder<BigInteger> {
   }
 
   @Override
-  public void verifyDeterministic() throws NonDeterministicException {
+  public void verifyDeterministic() {
     BYTE_ARRAY_CODER.verifyDeterministic();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
index 5a4db24..f49776b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -25,7 +25,7 @@ import java.util.BitSet;
 /**
  * Coder for {@link BitSet}.
  */
-public class BitSetCoder extends CustomCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
   private static final BitSetCoder INSTANCE = new BitSetCoder();
   private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
 
@@ -53,7 +53,7 @@ public class BitSetCoder extends CustomCoder<BitSet> {
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
     verifyDeterministic(
-        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+        this, "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index cba8d49..28cb627 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.io.ByteStreams;
 import java.io.IOException;
 import java.io.InputStream;
@@ -40,9 +39,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * encoded via a {@link VarIntCoder}.</li>
  * </ul>
  */
-public class ByteArrayCoder extends StructuredCoder<byte[]> {
+public class ByteArrayCoder extends AtomicCoder<byte[]> {
 
-  @JsonCreator
   public static ByteArrayCoder of() {
     return INSTANCE;
   }
@@ -117,7 +115,7 @@ public class ByteArrayCoder extends StructuredCoder<byte[]> {
   }
 
   @Override
-  public void verifyDeterministic() throws NonDeterministicException {}
+  public void verifyDeterministic() {}
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 1a1be64..6e4318e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization.
  */
-public class ByteCoder extends CustomCoder<Byte> {
+public class ByteCoder extends AtomicCoder<Byte> {
 
   public static ByteCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
index 2a1d792..4f05c95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
@@ -239,11 +239,12 @@ public final class CoderFactories {
     }
 
     /**
-     * If {@code coderType} is a subclass of {@link Coder} for a specific
-     * type {@code T}, returns {@code T.class}. Otherwise, raises IllegalArgumentException.
+     * If {@code coderType} is a subclass of {@link Coder} for a specific type {@code T}, returns
+     * {@code T.class}. Otherwise, raises IllegalArgumentException.
      */
-    private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<? extends Coder> coderType) {
-      TypeDescriptor<?> coderSupertype = coderType.getSupertype(Coder.class);
+    private <T, CoderT extends Coder> TypeDescriptor<T> getCodedType(
+        TypeDescriptor<CoderT> coderType) {
+      TypeDescriptor<? super CoderT> coderSupertype = coderType.getSupertype(Coder.class);
       ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType();
       @SuppressWarnings("unchecked")
       TypeDescriptor<T> token =

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 06e7dae..12bc5e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
  */
-public class DoubleCoder extends CustomCoder<Double> {
+public class DoubleCoder extends AtomicCoder<Double> {
 
   public static DoubleCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 10a83ef..7b49d1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -29,7 +29,7 @@ import org.joda.time.ReadableDuration;
  * A {@link Coder} that encodes a joda {@link Duration} as a {@link Long} using the format of
  * {@link VarLongCoder}.
  */
-public class DurationCoder extends CustomCoder<ReadableDuration> {
+public class DurationCoder extends AtomicCoder<ReadableDuration> {
 
   public static DurationCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index cfd1979..56ed12b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
  * A {@link Coder} for joda {@link Instant} that encodes it as a big endian {@link Long}
  * shifted such that lexicographic ordering of the bytes corresponds to chronological order.
  */
-public class InstantCoder extends CustomCoder<Instant> {
+public class InstantCoder extends AtomicCoder<Instant> {
   public static InstantCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 8a689f7..35b7449 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -89,8 +89,8 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
 
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Key coder must be deterministic", getKeyCoder());
-    verifyDeterministic("Value coder must be deterministic", getValueCoder());
+    verifyDeterministic(this, "Key coder must be deterministic", getKeyCoder());
+    verifyDeterministic(this, "Value coder must be deterministic", getValueCoder());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 32467d2..70bbf93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -58,8 +58,7 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
    */
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "ListCoder.elemCoder must be deterministic", getElemCoder());
+    verifyDeterministic(this, "ListCoder.elemCoder must be deterministic", getElemCoder());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index e2c4e28..da2bf50 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.values.TypeParameter;
  * @param <K> the type of the keys of the KVs being transcoded
  * @param <V> the type of the values of the KVs being transcoded
  */
-public class MapCoder<K, V> extends CustomCoder<Map<K, V>> {
+public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
   /**
    * Produces a MapCoder with the given keyCoder and valueCoder.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 747d91c..d1eea9a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> the type of the values being transcoded
  */
-public class NullableCoder<T> extends CustomCoder<T> {
+public class NullableCoder<T> extends StructuredCoder<T> {
   public static <T> NullableCoder<T> of(Coder<T> valueCoder) {
     if (valueCoder instanceof NullableCoder) {
       return (NullableCoder<T>) valueCoder;
@@ -93,11 +93,11 @@ public class NullableCoder<T> extends CustomCoder<T> {
   /**
    * {@code NullableCoder} is deterministic if the nested {@code Coder} is.
    *
-   * {@inheritDoc}
+   * <p>{@inheritDoc}
    */
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Value coder must be deterministic", valueCoder);
+    verifyDeterministic(this, "Value coder must be deterministic", valueCoder);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index f0a0969..42931ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * If in a nested context, prefixes the string with an integer length field,
  * encoded via a {@link VarIntCoder}.
  */
-public class StringUtf8Coder extends CustomCoder<String> {
+public class StringUtf8Coder extends AtomicCoder<String> {
 
   public static StringUtf8Coder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 91a46ea..9743c4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
  * their textual, decimal, representation.
  */
-public class TextualIntegerCoder extends CustomCoder<Integer> {
+public class TextualIntegerCoder extends AtomicCoder<Integer> {
 
   public static TextualIntegerCoder of() {
     return new TextualIntegerCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index fcc0335..30f9c09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
  * integers that are known to often be large or negative.
  */
-public class VarIntCoder extends CustomCoder<Integer> {
+public class VarIntCoder extends AtomicCoder<Integer> {
 
   public static VarIntCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index a65fa5e..829bd20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -25,9 +24,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
  */
-public class VoidCoder extends CustomCoder<Void> {
+public class VoidCoder extends AtomicCoder<Void> {
 
-  @JsonCreator
   public static VoidCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 0daf5dc..20fab9b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -43,8 +43,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
@@ -938,7 +938,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   /**
    * A coder for {@link FileResult} objects.
    */
-  public static final class FileResultCoder extends CustomCoder<FileResult> {
+  public static final class FileResultCoder extends AtomicCoder<FileResult> {
     private static final FileResultCoder INSTANCE = new FileResultCoder();
     private final NullableCoder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 5432f09..b05f223 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -762,13 +763,28 @@ public class ApproximateQuantiles {
     }
 
     @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof QuantileStateCoder)) {
+        return false;
+      }
+      QuantileStateCoder<?, ?> that = (QuantileStateCoder<?, ?>) other;
+      return Objects.equals(this.elementCoder, that.elementCoder)
+          && Objects.equals(this.compareFn, that.compareFn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(elementCoder, compareFn);
+    }
+
+    @Override
     public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(this, "QuantileState.ElementCoder must be deterministic", elementCoder);
       verifyDeterministic(
-          "QuantileState.ElementCoder must be deterministic",
-          elementCoder);
-      verifyDeterministic(
-          "QuantileState.ElementListCoder must be deterministic",
-          elementListCoder);
+          this, "QuantileState.ElementListCoder must be deterministic", elementListCoder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 666db3b..b9cdbd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,10 +36,10 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DelegateCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -522,7 +523,7 @@ public class Combine {
   /**
    * A {@link Coder} for a {@link Holder}.
    */
-  private static class HolderCoder<V> extends CustomCoder<Holder<V>> {
+  private static class HolderCoder<V> extends StructuredCoder<Holder<V>> {
 
     private Coder<V> valueCoder;
 
@@ -552,6 +553,11 @@ public class Combine {
     }
 
     @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.singletonList(valueCoder);
+    }
+
+    @Override
     public void verifyDeterministic() throws NonDeterministicException {
       valueCoder.verifyDeterministic();
     }
@@ -1954,7 +1960,7 @@ public class Combine {
       }
 
       private static class InputOrAccumCoder<InputT, AccumT>
-          extends CustomCoder<InputOrAccum<InputT, AccumT>> {
+          extends StructuredCoder<InputOrAccum<InputT, AccumT>> {
 
         private final Coder<InputT> inputCoder;
         private final Coder<AccumT> accumCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index cc02dcf..d4c97bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
@@ -524,7 +524,7 @@ public class CombineFns {
     }
   }
 
-  private static class ComposedAccumulatorCoder extends CustomCoder<Object[]> {
+  private static class ComposedAccumulatorCoder extends StructuredCoder<Object[]> {
     private List<Coder<Object>> coders;
     private int codersCount;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 78a6cd1..753e14c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -23,10 +23,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import java.util.Iterator;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.util.VarInt;
@@ -167,7 +167,7 @@ public class Count {
     @Override
     public Coder<long[]> getAccumulatorCoder(CoderRegistry registry,
                                              Coder<T> inputCoder) {
-      return new CustomCoder<long[]>() {
+      return new AtomicCoder<long[]>() {
         @Override
         public void encode(long[] value, OutputStream outStream, Context context)
             throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a6808cf..a309954 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
 
@@ -180,9 +180,8 @@ public class Mean {
     }
   }
 
-  static class CountSumCoder<NumT extends Number>
-  extends CustomCoder<CountSum<NumT>> {
-     private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
+  static class CountSumCoder<NumT extends Number> extends AtomicCoder<CountSum<NumT>> {
+    private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
      private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
 
      @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index e42c0b2..9d5db74 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -551,8 +552,7 @@ public class Top {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic(
-          "HeapCoder requires a deterministic list coder", listCoder);
+      verifyDeterministic(this, "HeapCoder requires a deterministic list coder", listCoder);
     }
 
     @Override
@@ -568,5 +568,24 @@ public class Top {
             throws Exception {
       listCoder.registerByteSizeObserver(value.asList(), observer, context);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof BoundedHeapCoder)) {
+        return false;
+      }
+      BoundedHeapCoder<?, ?> that = (BoundedHeapCoder<?, ?>) other;
+      return Objects.equals(this.compareFn, that.compareFn)
+          && Objects.equals(this.listCoder, that.listCoder)
+          && this.maximumSize == that.maximumSize;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(compareFn, listCoder, maximumSize);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 02e1185..e9a3571 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -300,7 +300,7 @@ public class CoGbkResult {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "CoGbkResult requires the union coder to be deterministic", unionCoder);
+          this, "CoGbkResult requires the union coder to be deterministic", unionCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index f411cd1..4a2a286 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -23,14 +23,14 @@ import java.io.OutputStream;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 
 /**
  * A UnionCoder encodes RawUnionValues.
  */
-public class UnionCoder extends CustomCoder<RawUnionValue> {
+public class UnionCoder extends StructuredCoder<RawUnionValue> {
   // TODO: Think about how to integrate this with a schema object (i.e.
   // a tuple of tuple tags).
   /**
@@ -134,7 +134,6 @@ public class UnionCoder extends CustomCoder<RawUnionValue> {
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
     verifyDeterministic(
-        "UnionCoder is only deterministic if all element coders are",
-        elementCoders);
+        this, "UnionCoder is only deterministic if all element coders are", elementCoders);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index faf3ca9..79ce2f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -26,9 +26,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -307,7 +307,7 @@ public final class PaneInfo {
   /**
    * A Coder for encoding PaneInfo instances.
    */
-  public static class PaneInfoCoder extends CustomCoder<PaneInfo> {
+  public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
     private enum Encoding {
       FIRST,
       ONE_INDEX,
@@ -340,6 +340,12 @@ public final class PaneInfo {
 
     public static final PaneInfoCoder INSTANCE = new PaneInfoCoder();
 
+    public static PaneInfoCoder of() {
+      return INSTANCE;
+    }
+
+    private PaneInfoCoder() {}
+
     @Override
     public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index b0e9b5c..b646bf6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -21,15 +21,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.BitSet;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 
 /**
  * Coder for the BitSet used to track child-trigger finished states.
+ *
+ * @deprecated use {@link org.apache.beam.sdk.coders.BitSetCoder} instead
  */
 @Deprecated
-public class BitSetCoder extends CustomCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
 
   private static final BitSetCoder INSTANCE = new BitSetCoder();
   private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
@@ -54,8 +56,7 @@ public class BitSetCoder extends CustomCoder<BitSet> {
 
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+    BYTE_ARRAY_CODER.verifyDeterministic();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 13e499d..1e72550 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -663,11 +663,9 @@ public abstract class WindowedValue<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "FullWindowedValueCoder requires a deterministic valueCoder",
-          valueCoder);
+          this, "FullWindowedValueCoder requires a deterministic valueCoder", valueCoder);
       verifyDeterministic(
-          "FullWindowedValueCoder requires a deterministic windowCoder",
-          windowCoder);
+          this, "FullWindowedValueCoder requires a deterministic windowCoder", windowCoder);
     }
 
     @Override
@@ -728,6 +726,7 @@ public abstract class WindowedValue<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
+          this,
           "ValueOnlyWindowedValueCoder requires a deterministic valueCoder",
           valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index cde9a40..a9f3929 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -23,11 +23,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Instant;
 
@@ -85,7 +86,7 @@ public class TimestampedValue<V> {
   /////////////////////////////////////////////////////////////////////////////
 
   /** A {@link Coder} for {@link TimestampedValue}. */
-  public static class TimestampedValueCoder<T> extends CustomCoder<TimestampedValue<T>> {
+  public static class TimestampedValueCoder<T> extends StructuredCoder<TimestampedValue<T>> {
 
     private final Coder<T> valueCoder;
 
@@ -119,8 +120,7 @@ public class TimestampedValue<V> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "TimestampedValueCoder requires a deterministic valueCoder",
-          valueCoder);
+          this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder);
     }
 
     @Override
@@ -141,6 +141,11 @@ public class TimestampedValue<V> {
       return new TypeDescriptor<TimestampedValue<T>>() {}.where(
           new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
     }
+
+    @Override
+    public List<? extends Coder<?>> getComponents() {
+      return Collections.singletonList(valueCoder);
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index 1fd356b..3ecbaa2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -24,8 +24,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.joda.time.Instant;
@@ -56,7 +56,7 @@ public abstract class ValueInSingleWindow<T> {
   }
 
   /** A coder for {@link ValueInSingleWindow}. */
-  public static class Coder<T> extends CustomCoder<ValueInSingleWindow<T>> {
+  public static class Coder<T> extends StructuredCoder<ValueInSingleWindow<T>> {
     private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
     private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
index 0d92f40..3f057e1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -27,7 +27,7 @@ import java.util.Objects;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
@@ -85,7 +85,7 @@ public class ValueWithRecordId<ValueT> {
    * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
    */
   public static class ValueWithRecordIdCoder<ValueT>
-      extends CustomCoder<ValueWithRecordId<ValueT>> {
+      extends StructuredCoder<ValueWithRecordId<ValueT>> {
     public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
       return new ValueWithRecordIdCoder<>(valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index fe21a1c..5107355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -99,7 +99,7 @@ public class CoderRegistryTest {
   }
 
   @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
-  private class MyListCoder extends CustomCoder<List> {
+  private class MyListCoder extends AtomicCoder<List> {
     @Override
     public void encode(List value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -441,7 +441,7 @@ public class CoderRegistryTest {
 
   private static class MyValue { }
 
-  private static class MyValueCoder extends CustomCoder<MyValue> {
+  private static class MyValueCoder extends AtomicCoder<MyValue> {
 
     private static final MyValueCoder INSTANCE = new MyValueCoder();
     private static final TypeDescriptor<MyValue> TYPE_DESCRIPTOR = TypeDescriptor.of(MyValue.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
index 5ff272f..8aeb22a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
@@ -24,8 +24,6 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
@@ -108,29 +106,6 @@ public class DelegateCoderTest implements Serializable {
   private static final String TEST_ENCODING_ID = "test-encoding-id";
   private static final String TEST_ALLOWED_ENCODING = "test-allowed-encoding";
 
-  private static class TestAllowedEncodingsCoder extends CustomCoder<Integer> {
-
-    @Override
-    public void encode(Integer value, OutputStream outstream, Context context) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Integer decode(InputStream instream, Context context) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void verifyDeterministic() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Collections.emptyList();
-    }
-  }
-
   @Test
   public void testCoderEquals() throws Exception {
     DelegateCoder.CodingFunction<Integer, Integer> identityFn =

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 894d2d1..c0a4bed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -169,7 +169,7 @@ public class NullableCoderTest {
     assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(String.class)));
   }
 
-  private static class EntireStreamExpectingCoder extends CustomCoder<String> {
+  private static class EntireStreamExpectingCoder extends AtomicCoder<String> {
     @Override
     public void encode(
         String value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
index f337f36..164d221 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
@@ -24,6 +24,7 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -47,7 +48,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder that says it is not deterministic but actually is. */
-  public static class NonDeterministicCoder extends CustomCoder<String> {
+  public static class NonDeterministicCoder extends AtomicCoder<String> {
     @Override
     public void encode(String value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -59,17 +60,23 @@ public class CoderPropertiesTest {
         throws CoderException, IOException {
       return StringUtf8Coder.of().decode(inStream, context);
     }
+
+    public void verifyDeterministic() throws NonDeterministicException {
+      throw new NonDeterministicException(this, "Not Deterministic");
+    }
   }
 
   @Test
   public void testNonDeterministicCoder() throws Exception {
     try {
       CoderProperties.coderDeterministic(new NonDeterministicCoder(), "TestData", "TestData");
-      fail("Expected AssertionError");
     } catch (AssertionError error) {
       assertThat(error.getMessage(),
           CoreMatchers.containsString("Expected that the coder is deterministic"));
+      // success!
+      return;
     }
+    fail("Expected AssertionError");
   }
 
   @Test
@@ -84,7 +91,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder that is non-deterministic because it adds a string to the value. */
-  private static class BadDeterminsticCoder extends CustomCoder<String> {
+  private static class BadDeterminsticCoder extends AtomicCoder<String> {
     public BadDeterminsticCoder() {
     }
 
@@ -141,6 +148,17 @@ public class CoderPropertiesTest {
       String decodedValue = StringUtf8Coder.of().decode(inStream, context);
       return decodedValue.substring(0, decodedValue.length() - changedState);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof StateChangingSerializingCoder
+          && ((StateChangingSerializingCoder) other).changedState == this.changedState;
+    }
+
+    @Override
+    public int hashCode() {
+      return changedState;
+    }
   }
 
   @Test
@@ -175,6 +193,17 @@ public class CoderPropertiesTest {
         throws CoderException, IOException {
       return StringUtf8Coder.of().decode(inStream, context);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return (other instanceof ForgetfulSerializingCoder)
+          && ((ForgetfulSerializingCoder) other).lostState == lostState;
+    }
+
+    @Override
+    public int hashCode() {
+      return lostState;
+    }
   }
 
   @Test
@@ -185,7 +214,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder which closes the underlying stream during encoding and decoding. */
-  public static class ClosingCoder extends CustomCoder<String> {
+  public static class ClosingCoder extends AtomicCoder<String> {
     @Override
     public void encode(String value, OutputStream outStream, Context context) throws IOException {
       outStream.close();