You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/05 20:35:22 UTC

[2/6] beam git commit: Reparent many Coders to Atomic or StructuredCoder

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index cfe7436..2ef892c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -35,8 +35,8 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
@@ -88,7 +88,7 @@ public class PAssertTest implements Serializable {
     }
   }
 
-  private static class NotSerializableObjectCoder extends CustomCoder<NotSerializableObject> {
+  private static class NotSerializableObjectCoder extends AtomicCoder<NotSerializableObject> {
     private NotSerializableObjectCoder() { }
     private static final NotSerializableObjectCoder INSTANCE = new NotSerializableObjectCoder();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index ddc92d6..db5ff2e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -29,8 +29,8 @@ import com.google.common.collect.ImmutableList;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.hamcrest.Matchers;
@@ -151,7 +151,7 @@ public class SerializableMatchersTest implements Serializable {
     }
   }
 
-  private static class NotSerializableClassCoder extends CustomCoder<NotSerializableClass> {
+  private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> {
     @Override
     public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) {
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
index 38a2fa2..546683b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -24,8 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collections;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -72,12 +72,12 @@ public class WindowSupplierTest {
         Collections.<BoundedWindow>singleton(window));
   }
 
-  private static class FailingCoder extends CustomCoder<BoundedWindow>  {
+  private static class FailingCoder extends AtomicCoder<BoundedWindow> {
     @Override
     public void encode(
         BoundedWindow value, OutputStream outStream, Context context)
         throws CoderException, IOException {
-      throw new CoderException("Test Enccode Exception");
+      throw new CoderException("Test Encode Exception");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 9250dfa..62edac9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -28,10 +28,10 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -326,7 +326,7 @@ public class  CombineFnsTest {
     }
   }
 
-  private static class UserStringCoder extends CustomCoder<UserString> {
+  private static class UserStringCoder extends AtomicCoder<UserString> {
     public static UserStringCoder of() {
       return INSTANCE;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 82c2504..12619e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -41,12 +41,12 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -874,7 +874,7 @@ public class CombineTest implements Serializable {
     /**
      * A {@link Coder} for {@link CountSum}.
      */
-    private class CountSumCoder extends CustomCoder<CountSum> {
+    private class CountSumCoder extends AtomicCoder<CountSum> {
       @Override
       public void encode(CountSum value, OutputStream outStream,
           Context context) throws CoderException, IOException {
@@ -923,7 +923,7 @@ public class CombineTest implements Serializable {
       }
 
       public static Coder<Accumulator> getCoder() {
-        return new CustomCoder<Accumulator>() {
+        return new AtomicCoder<Accumulator>() {
           @Override
           public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
               throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 89a1f33..76f61b3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -36,10 +36,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -132,7 +132,7 @@ public class CreateTest {
   static class Record2 extends Record {
   }
 
-  private static class RecordCoder extends CustomCoder<Record> {
+  private static class RecordCoder extends AtomicCoder<Record> {
     @Override
     public void encode(Record value, OutputStream outStream, Context context)
         throws CoderException, IOException {}
@@ -201,7 +201,7 @@ public class CreateTest {
       return myString.equals(((UnserializableRecord) o).myString);
     }
 
-    static class UnserializableRecordCoder extends CustomCoder<UnserializableRecord> {
+    static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
       private final Coder<String> stringCoder = StringUtf8Coder.of();
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 9cb642a..6982e01 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -38,9 +38,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -453,7 +453,7 @@ public class GroupByKeyTest {
   /**
    * Deterministic {@link Coder} for {@link BadEqualityKey}.
    */
-  static class DeterministicKeyCoder extends CustomCoder<BadEqualityKey> {
+  static class DeterministicKeyCoder extends AtomicCoder<BadEqualityKey> {
 
     public static DeterministicKeyCoder of() {
       return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index ffdf3d0..073957f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,9 +55,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.SetCoder;
@@ -976,7 +976,7 @@ public class ParDoTest implements Serializable {
 
   private static class TestDummy { }
 
-  private static class TestDummyCoder extends CustomCoder<TestDummy> {
+  private static class TestDummyCoder extends AtomicCoder<TestDummy> {
     private TestDummyCoder() { }
     private static final TestDummyCoder INSTANCE = new TestDummyCoder();
 
@@ -1085,7 +1085,7 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  private static class MyIntegerCoder extends CustomCoder<MyInteger> {
+  private static class MyIntegerCoder extends AtomicCoder<MyInteger> {
     private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
 
     private final VarIntCoder delegate = VarIntCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index e72c540..84f3d69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -39,9 +39,9 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -505,7 +505,7 @@ public class ViewTest implements Serializable {
     pipeline.run();
   }
 
-  private static class NonDeterministicStringCoder extends CustomCoder<String> {
+  private static class NonDeterministicStringCoder extends AtomicCoder<String> {
     @Override
     public void encode(String value, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 19b6092..f26dd59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -35,8 +35,8 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -270,7 +270,7 @@ public class DoFnInvokersTest {
   private abstract static class SomeRestrictionTracker
       implements RestrictionTracker<SomeRestriction> {}
 
-  private static class SomeRestrictionCoder extends CustomCoder<SomeRestriction> {
+  private static class SomeRestrictionCoder extends AtomicCoder<SomeRestriction> {
     public static SomeRestrictionCoder of() {
       return new SomeRestrictionCoder();
     }
@@ -392,7 +392,7 @@ public class DoFnInvokersTest {
     public void checkDone() throws IllegalStateException {}
   }
 
-  private static class CoderForDefaultTracker extends CustomCoder<RestrictionWithDefaultTracker> {
+  private static class CoderForDefaultTracker extends AtomicCoder<RestrictionWithDefaultTracker> {
     public static CoderForDefaultTracker of() {
       return new CoderForDefaultTracker();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 0db5355..7230a8b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -24,10 +24,10 @@ import static org.mockito.Mockito.mock;
 
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.testing.CoderPropertiesTest.ClosingCoder;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,7 +44,7 @@ public class CoderUtilsTest {
   @Rule
   public transient ExpectedException expectedException = ExpectedException.none();
 
-  static class TestCoder extends CustomCoder<Integer> {
+  static class TestCoder extends AtomicCoder<Integer> {
     public static TestCoder of() {
       return new TestCoder();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 2d5baf2..6ba1d4a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -25,9 +25,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -85,7 +85,7 @@ public class SerializableUtilsTest {
   }
 
   /** A {@link Coder} that is not serializable by Java. */
-  private static class UnserializableCoderByJava extends CustomCoder<Object> {
+  private static class UnserializableCoderByJava extends AtomicCoder<Object> {
     private final Object unserializableField = new Object();
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index d4e6f63..0781cf1 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -22,9 +22,9 @@ import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * <p>When this code is used in a nested {@link Coder.Context}, the serialized {@link ByteString}
  * objects are first delimited by their size.
  */
-public class ByteStringCoder extends CustomCoder<ByteString> {
+public class ByteStringCoder extends AtomicCoder<ByteString> {
 
   public static ByteStringCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
index 26b4b56..f04e9b9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -25,7 +25,7 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 
 
@@ -34,11 +34,14 @@ import org.apache.beam.sdk.coders.VarIntCoder;
  */
 @VisibleForTesting
 class ShardedKeyCoder<KeyT>
-    extends CustomCoder<ShardedKey<KeyT>> {
+    extends StructuredCoder<ShardedKey<KeyT>> {
   public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
     return new ShardedKeyCoder<>(keyCoder);
   }
 
+  private final Coder<KeyT> keyCoder;
+  private final VarIntCoder shardNumberCoder;
+
   protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
     this.keyCoder = keyCoder;
     this.shardNumberCoder = VarIntCoder.of();
@@ -68,7 +71,4 @@ class ShardedKeyCoder<KeyT>
   public void verifyDeterministic() throws NonDeterministicException {
     keyCoder.verifyDeterministic();
   }
-
-  Coder<KeyT> keyCoder;
-  VarIntCoder shardNumberCoder;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 8a06d13..3059e2a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -21,12 +21,12 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /** A coder for {@link TableDestination} objects. */
-public class TableDestinationCoder extends CustomCoder<TableDestination> {
+public class TableDestinationCoder extends AtomicCoder<TableDestination> {
   private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
   private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index c3e48a4..2b1988a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -22,15 +22,15 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /**
  * Defines a coder for {@link TableRowInfo} objects.
  */
 @VisibleForTesting
-class TableRowInfoCoder extends CustomCoder<TableRowInfo> {
+class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
   private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
 
   public static TableRowInfoCoder of() {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index a1ca41b..7ca8958 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -23,15 +23,15 @@ import com.google.api.services.bigquery.model.TableRow;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
  */
-public class TableRowJsonCoder extends CustomCoder<TableRow> {
+public class TableRowJsonCoder extends AtomicCoder<TableRow> {
 
   public static TableRowJsonCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 70aa135..b896083 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -26,12 +26,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -71,7 +73,7 @@ class WriteBundlesToFiles<DestinationT>
   }
 
   /** a coder for the {@link Result} class. */
-  public static class ResultCoder<DestinationT> extends CustomCoder<Result<DestinationT>> {
+  public static class ResultCoder<DestinationT> extends StructuredCoder<Result<DestinationT>> {
     private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
     private static final VarLongCoder longCoder = VarLongCoder.of();
     private final Coder<DestinationT> destinationCoder;
@@ -105,6 +107,11 @@ class WriteBundlesToFiles<DestinationT>
     }
 
     @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.singletonList(destinationCoder);
+    }
+
+    @Override
     public void verifyDeterministic() {}
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 031d9a0..9f04a6c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -31,11 +31,11 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
@@ -100,7 +100,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
   /**
    * Coder for conveying outgoing messages between internal stages.
    */
-  private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
+  private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
     private static final NullableCoder<String> RECORD_ID_CODER =
         NullableCoder.of(StringUtf8Coder.of());
     private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index c2cbe73..c16b8fb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -118,7 +118,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
   /**
    * Coder for checkpoints.
    */
-  private static final PubsubCheckpointCoder CHECKPOINT_CODER = new PubsubCheckpointCoder();
+  private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = PubsubCheckpointCoder.of();
 
   /**
    * Maximum number of messages per pull.
@@ -357,11 +357,17 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
   }
 
   /** The coder for our checkpoints. */
-  private static class PubsubCheckpointCoder extends CustomCoder<PubsubCheckpoint> {
+  private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint> {
     private static final Coder<String> SUBSCRIPTION_PATH_CODER =
         NullableCoder.of(StringUtf8Coder.of());
     private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
 
+    public static <T> PubsubCheckpointCoder<T> of() {
+      return new PubsubCheckpointCoder<>();
+    }
+
+    private PubsubCheckpointCoder() {}
+
     @Override
     public void encode(PubsubCheckpoint value, OutputStream outStream, Context context)
         throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index a3b21ee..6c15f87 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -73,10 +73,10 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -720,7 +720,7 @@ public class BigQueryIOTest implements Serializable {
   /**
    * Coder for @link{PartitionedGlobalWindow}.
    */
-  private static class PartitionedGlobalWindowCoder extends CustomCoder<PartitionedGlobalWindow> {
+  private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
     @Override
     public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 9589fb1..15877b0 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -92,4 +93,21 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> {
         "Hadoop Writable may be non-deterministic.");
   }
 
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (!(other instanceof WritableCoder)) {
+      return false;
+    }
+    WritableCoder<?> that = (WritableCoder<?>) other;
+    return Objects.equals(this.type, that.type);
+  }
+
+  @Override
+  public int hashCode() {
+    return type.hashCode();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 35a8863..7cc043c 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut
  * A {@link Coder} that serializes and deserializes the {@link Mutation} objects using {@link
  * ProtobufUtil}.
  */
-class HBaseMutationCoder extends CustomCoder<Mutation> implements Serializable {
+class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
   private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder();
 
   private HBaseMutationCoder() {}

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 0004d03..24a5f7f 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
  * A {@link Coder} that serializes and deserializes the {@link Result} objects using {@link
  * ProtobufUtil}.
  */
-class HBaseResultCoder extends CustomCoder<Result> implements Serializable {
+class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
   private static final HBaseResultCoder INSTANCE = new HBaseResultCoder();
 
   private HBaseResultCoder() {}

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index f4de76a..e676455 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -55,11 +55,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.io.Read.Unbounded;
@@ -1595,11 +1595,11 @@ public class KafkaIO {
     }
   }
 
-  private static class NullOnlyCoder<T> extends CustomCoder<T> {
+  private static class NullOnlyCoder<T> extends AtomicCoder<T> {
     @Override
     public void encode(T value, OutputStream outStream, Context context) {
       checkArgument(value == null, "Can only encode nulls");
-      // Encode as the empty string.
+      // Encode as no bytes.
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 160e8ce..0f5dc4c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -23,9 +23,9 @@ import java.io.OutputStream;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.values.KV;
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.KV;
 /**
  * {@link Coder} for {@link KafkaRecord}.
  */
-public class KafkaRecordCoder<K, V> extends CustomCoder<KafkaRecord<K, V>> {
+public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
 
   private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
   private static final VarLongCoder longCoder = VarLongCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index 4da2b05..77fe127 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -32,7 +32,7 @@ import org.joda.time.Instant;
 /**
  * A {@link Coder} for {@link KinesisRecord}.
  */
-class KinesisRecordCoder extends CustomCoder<KinesisRecord> {
+class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
     private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
     private static final InstantCoder INSTANT_CODER = InstantCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index 812bc70..75a4619 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -23,6 +23,7 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Objects;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
@@ -139,6 +140,23 @@ public class JAXBCoder<T> extends CustomCoder<T> {
     return TypeDescriptor.of(jaxbClass);
   }
 
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (!(other instanceof JAXBCoder)) {
+      return false;
+    }
+    JAXBCoder<?> that = (JAXBCoder<?>) other;
+    return Objects.equals(this.jaxbClass, that.jaxbClass);
+  }
+
+  @Override
+  public int hashCode() {
+    return jaxbClass.hashCode();
+  }
+
   private static class CloseIgnoringInputStream extends FilterInputStream {
 
     protected CloseIgnoringInputStream(InputStream in) {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
index 940d596..2b4503a 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.xml.bind.annotation.XmlRootElement;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.CoderProperties;
@@ -171,7 +171,7 @@ public class JAXBCoderTest {
   /**
    * A coder that surrounds the value with two values, to demonstrate nesting.
    */
-  private static class TestCoder extends CustomCoder<TestType> {
+  private static class TestCoder extends StructuredCoder<TestType> {
     private final JAXBCoder<TestType> jaxbCoder;
     public TestCoder(JAXBCoder<TestType> jaxbCoder) {
       this.jaxbCoder = jaxbCoder;