You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/05 21:45:13 UTC

[1/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types

Repository: beam
Updated Branches:
  refs/heads/master 650598836 -> 947fa68a5


http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 5107355..9568324 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -23,13 +23,11 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.reflect.Type;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -74,28 +72,10 @@ public class CoderRegistryTest {
   private static class NotSerializableClass { }
 
   @Test
-  public void testSerializableFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    registry.setFallbackCoderProvider(SerializableCoder.PROVIDER);
-    Coder<?> serializableCoder = registry.getDefaultCoder(SerializableClass.class);
-
-    assertEquals(serializableCoder, SerializableCoder.of(SerializableClass.class));
-  }
-
-  @Test
-  public void testAvroFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
-    Coder<?> avroCoder = registry.getDefaultCoder(NotSerializableClass.class);
-
-    assertEquals(avroCoder, AvroCoder.of(NotSerializableClass.class));
-  }
-
-  @Test
   public void testRegisterInstantiatedCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
-    registry.registerCoder(MyValue.class, MyValueCoder.of());
-    assertEquals(registry.getDefaultCoder(MyValue.class), MyValueCoder.of());
+    registry.registerCoderForClass(MyValue.class, MyValueCoder.of());
+    assertEquals(registry.getCoder(MyValue.class), MyValueCoder.of());
   }
 
   @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
@@ -121,17 +101,9 @@ public class CoderRegistryTest {
   }
 
   @Test
-  public void testRegisterInstantiatedCoderInvalidRawtype() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("may not be used with unspecialized generic classes");
-    CoderRegistry registry = CoderRegistry.createDefault();
-    registry.registerCoder(List.class, new MyListCoder());
-  }
-
-  @Test
   public void testSimpleDefaultCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
-    assertEquals(StringUtf8Coder.of(), registry.getDefaultCoder(String.class));
+    assertEquals(StringUtf8Coder.of(), registry.getCoder(String.class));
   }
 
   @Test
@@ -139,11 +111,9 @@ public class CoderRegistryTest {
     CoderRegistry registry = CoderRegistry.createDefault();
     thrown.expect(CannotProvideCoderException.class);
     thrown.expectMessage(allOf(
-        containsString(UnknownType.class.getCanonicalName()),
-        containsString("No CoderFactory has been registered"),
-        containsString("does not have a @DefaultCoder annotation"),
-        containsString("does not implement Serializable")));
-    registry.getDefaultCoder(UnknownType.class);
+        containsString(UnknownType.class.getName()),
+        containsString("Unable to provide a Coder for")));
+    registry.getCoder(UnknownType.class);
   }
 
   @Test
@@ -151,14 +121,15 @@ public class CoderRegistryTest {
     CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<List<Integer>> listToken = new TypeDescriptor<List<Integer>>() {};
     assertEquals(ListCoder.of(VarIntCoder.of()),
-                 registry.getDefaultCoder(listToken));
+                 registry.getCoder(listToken));
 
-    registry.registerCoder(MyValue.class, MyValueCoder.class);
+    registry.registerCoderProvider(
+        CoderProviders.fromStaticMethods(MyValue.class, MyValueCoder.class));
     TypeDescriptor<KV<String, List<MyValue>>> kvToken =
         new TypeDescriptor<KV<String, List<MyValue>>>() {};
     assertEquals(KvCoder.of(StringUtf8Coder.of(),
                             ListCoder.of(MyValueCoder.of())),
-                 registry.getDefaultCoder(kvToken));
+                 registry.getCoder(kvToken));
 
   }
 
@@ -167,7 +138,7 @@ public class CoderRegistryTest {
     CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Map<Integer, String>> mapToken = new TypeDescriptor<Map<Integer, String>>() {};
     assertEquals(MapCoder.of(VarIntCoder.of(), StringUtf8Coder.of()),
-                 registry.getDefaultCoder(mapToken));
+                 registry.getCoder(mapToken));
   }
 
   @Test
@@ -177,21 +148,21 @@ public class CoderRegistryTest {
         new TypeDescriptor<Map<Integer, Map<String, Double>>>() {};
     assertEquals(
         MapCoder.of(VarIntCoder.of(), MapCoder.of(StringUtf8Coder.of(), DoubleCoder.of())),
-        registry.getDefaultCoder(mapToken));
+        registry.getCoder(mapToken));
   }
 
   @Test
   public void testParameterizedDefaultSetCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Set<Integer>> setToken = new TypeDescriptor<Set<Integer>>() {};
-    assertEquals(SetCoder.of(VarIntCoder.of()), registry.getDefaultCoder(setToken));
+    assertEquals(SetCoder.of(VarIntCoder.of()), registry.getCoder(setToken));
   }
 
   @Test
   public void testParameterizedDefaultNestedSetCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     TypeDescriptor<Set<Set<Integer>>> setToken = new TypeDescriptor<Set<Set<Integer>>>() {};
-    assertEquals(SetCoder.of(SetCoder.of(VarIntCoder.of())), registry.getDefaultCoder(setToken));
+    assertEquals(SetCoder.of(SetCoder.of(VarIntCoder.of())), registry.getCoder(setToken));
   }
 
   @Test
@@ -201,11 +172,11 @@ public class CoderRegistryTest {
 
     thrown.expect(CannotProvideCoderException.class);
     thrown.expectMessage(String.format(
-        "Cannot provide coder for parameterized type %s: Unable to provide a default Coder for %s",
+        "Cannot provide coder for parameterized type %s: Unable to provide a Coder for %s",
         listUnknownToken,
-        UnknownType.class.getCanonicalName()));
+        UnknownType.class.getName()));
 
-    registry.getDefaultCoder(listUnknownToken);
+    registry.getCoder(listUnknownToken);
   }
 
   @Test
@@ -214,7 +185,7 @@ public class CoderRegistryTest {
     MyGenericClass<MyValue, List<MyValue>> instance =
         new MyGenericClass<MyValue, List<MyValue>>() {};
 
-    Coder<?> bazCoder = registry.getDefaultCoder(
+    Coder<?> bazCoder = registry.getCoder(
         instance.getClass(),
         MyGenericClass.class,
         Collections.<Type, Coder<?>>singletonMap(
@@ -230,7 +201,7 @@ public class CoderRegistryTest {
     MyGenericClass<MyValue, List<MyValue>> instance =
         new MyGenericClass<MyValue, List<MyValue>>() {};
 
-    Coder<?> fooCoder = registry.getDefaultCoder(
+    Coder<?> fooCoder = registry.getCoder(
         instance.getClass(),
         MyGenericClass.class,
         Collections.<Type, Coder<?>>singletonMap(
@@ -242,49 +213,6 @@ public class CoderRegistryTest {
   }
 
   @Test
-  public void testGetDefaultCoderFromIntegerValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    Integer i = 13;
-    Coder<Integer> coder = registry.getDefaultCoder(i);
-    assertEquals(VarIntCoder.of(), coder);
-  }
-
-  @Test
-  public void testGetDefaultCoderFromNullValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    assertEquals(VoidCoder.of(), registry.getDefaultCoder((Void) null));
-  }
-
-  @Test
-  public void testGetDefaultCoderFromKvValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    KV<Integer, String> kv = KV.of(13, "hello");
-    Coder<KV<Integer, String>> coder = registry.getDefaultCoder(kv);
-    assertEquals(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()),
-        coder);
-  }
-
-  @Test
-  public void testGetDefaultCoderFromKvNullValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    KV<Void, Void> kv = KV.of((Void) null, (Void) null);
-    assertEquals(KvCoder.of(VoidCoder.of(), VoidCoder.of()),
-        registry.getDefaultCoder(kv));
-  }
-
-  @Test
-  public void testGetDefaultCoderFromNestedKvValue() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-    KV<Integer, KV<Long, KV<String, String>>> kv = KV.of(13, KV.of(17L, KV.of("hello", "goodbye")));
-    Coder<KV<Integer, KV<Long, KV<String, String>>>> coder = registry.getDefaultCoder(kv);
-    assertEquals(
-        KvCoder.of(VarIntCoder.of(),
-            KvCoder.of(VarLongCoder.of(),
-                KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
-        coder);
-  }
-
-  @Test
   public void testTypeCompatibility() throws Exception {
     CoderRegistry.verifyCompatible(BigEndianIntegerCoder.of(), Integer.class);
     CoderRegistry.verifyCompatible(
@@ -335,7 +263,7 @@ public class CoderRegistryTest {
   public void testDefaultCoderAnnotationGenericRawtype() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     assertEquals(
-        registry.getDefaultCoder(MySerializableGeneric.class),
+        registry.getCoder(MySerializableGeneric.class),
         SerializableCoder.of(MySerializableGeneric.class));
   }
 
@@ -343,7 +271,7 @@ public class CoderRegistryTest {
   public void testDefaultCoderAnnotationGeneric() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     assertEquals(
-        registry.getDefaultCoder(new TypeDescriptor<MySerializableGeneric<String>>() {}),
+        registry.getCoder(new TypeDescriptor<MySerializableGeneric<String>>() {}),
         SerializableCoder.of(MySerializableGeneric.class));
   }
 
@@ -371,11 +299,8 @@ public class CoderRegistryTest {
     CoderRegistry registry = CoderRegistry.createDefault();
 
     thrown.expect(CannotProvideCoderException.class);
-    thrown.expectMessage(allOf(
-        containsString("No CoderFactory has been registered"),
-        containsString("does not have a @DefaultCoder annotation"),
-        containsString("does not implement Serializable")));
-    registry.getDefaultCoder(TypeDescriptor.of(
+    thrown.expectMessage("Unable to provide a Coder");
+    registry.getCoder(TypeDescriptor.of(
         TestGenericClass.class.getTypeParameters()[0]));
   }
 
@@ -388,8 +313,7 @@ public class CoderRegistryTest {
 
     TypeDescriptor type = TypeDescriptor.of(
         TestSerializableGenericClass.class.getTypeParameters()[0]);
-    assertEquals(registry.getDefaultCoder(type),
-        SerializableCoder.of(type));
+    assertEquals(SerializableCoder.of(type), registry.getCoder(type));
   }
 
   private static class TestSerializableGenericClass<TestGenericT extends Serializable> {}
@@ -450,12 +374,6 @@ public class CoderRegistryTest {
       return INSTANCE;
     }
 
-    @SuppressWarnings("unused")
-    public static List<Object> getInstanceComponents(
-        @SuppressWarnings("unused") MyValue exampleValue) {
-      return Arrays.asList();
-    }
-
     @Override
     public void encode(MyValue value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -498,7 +416,12 @@ public class CoderRegistryTest {
     }
   }
 
-  private static class UnknownType { }
+  /**
+   * This type is incompatible with all known coder providers such as Serializable,
+   * {@code @DefaultCoder} which allows testing scenarios where coder lookup fails.
+   */
+  private static class UnknownType {
+  }
 
   @DefaultCoder(SerializableCoder.class)
   private static class MySerializableGeneric<T extends Serializable> implements Serializable {
@@ -506,20 +429,34 @@ public class CoderRegistryTest {
     private T foo;
   }
 
+  /**
+   * This type is incompatible with all known coder providers such as Serializable,
+   * {@code @DefaultCoder} which allows testing the automatic registration mechanism.
+   */
+  private static class AutoRegistrationClass {
+  }
+
+  private static class AutoRegistrationClassCoder extends CustomCoder<AutoRegistrationClass> {
+    private static final AutoRegistrationClassCoder INSTANCE = new AutoRegistrationClassCoder();
+  }
+
   @Test
-  public void testAutomaticRegistrationOfCoders() throws Exception {
-    assertEquals(CoderRegistry.createDefault().getDefaultCoder(MyValue.class), MyValueCoder.of());
+  public void testAutomaticRegistrationOfCoderProviders() throws Exception {
+    assertEquals(AutoRegistrationClassCoder.INSTANCE,
+        CoderRegistry.createDefault().getCoder(AutoRegistrationClass.class));
   }
 
   /**
-   * A {@link CoderRegistrar} to demonstrate default {@link Coder} registration.
+   * A {@link CoderProviderRegistrar} to demonstrate default {@link Coder} registration.
    */
-  @AutoService(CoderRegistrar.class)
-  public static class RegisteredTestCoderRegistrar implements CoderRegistrar {
+  @AutoService(CoderProviderRegistrar.class)
+  public static class RegisteredTestCoderProviderRegistrar implements CoderProviderRegistrar {
     @Override
-    public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
-      return ImmutableMap.<Class<?>, CoderFactory>of(
-          MyValue.class, CoderFactories.forCoder(MyValueCoder.of()));
+    public List<CoderProvider> getCoderProviders() {
+      return ImmutableList.of(
+          CoderProviders.forCoder(
+              TypeDescriptor.of(AutoRegistrationClass.class),
+              AutoRegistrationClassCoder.INSTANCE));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
index d335b18..aa8d94c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
@@ -18,13 +18,15 @@
 package org.apache.beam.sdk.coders;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
+import org.apache.beam.sdk.coders.DefaultCoder.DefaultCoderProviderRegistrar.DefaultCoderProvider;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -32,15 +34,12 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests of Coder defaults.
+ * Tests for {@link DefaultCoder}.
  */
 @RunWith(JUnit4.class)
 public class DefaultCoderTest {
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  public CoderRegistry registry = CoderRegistry.createDefault();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @DefaultCoder(AvroCoder.class)
   private static class AvroRecord {
@@ -75,6 +74,17 @@ public class DefaultCoderTest {
     protected CustomSerializableCoder() {
       super(CustomRecord.class, TypeDescriptor.of(CustomRecord.class));
     }
+
+    @SuppressWarnings("unused")
+    public static CoderProvider getCoderProvider() {
+      return new CoderProvider() {
+        @Override
+        public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+            List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+          return CustomSerializableCoder.of((TypeDescriptor) typeDescriptor);
+        }
+      };
+    }
   }
 
   private static class OldCustomSerializableCoder extends SerializableCoder<OldCustomRecord> {
@@ -89,33 +99,52 @@ public class DefaultCoderTest {
     protected OldCustomSerializableCoder() {
       super(OldCustomRecord.class, TypeDescriptor.of(OldCustomRecord.class));
     }
+
+    @SuppressWarnings("unused")
+    public static CoderProvider getCoderProvider() {
+      return new CoderProvider() {
+        @Override
+        public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+            List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+          return OldCustomSerializableCoder.of((Class) typeDescriptor.getRawType());
+        }
+      };
+    }
   }
 
   @Test
-  public void testDefaultCoderClasses() throws Exception {
-    assertThat(registry.getDefaultCoder(AvroRecord.class), instanceOf(AvroCoder.class));
-    assertThat(registry.getDefaultCoder(SerializableBase.class),
-        instanceOf(SerializableCoder.class));
-    assertThat(registry.getDefaultCoder(SerializableRecord.class),
+  public void testCodersWithoutComponents() throws Exception {
+    CoderRegistry registry = CoderRegistry.createDefault();
+    registry.registerCoderProvider(
+        new DefaultCoderProvider());
+    assertThat(registry.getCoder(AvroRecord.class),
+        instanceOf(AvroCoder.class));
+    assertThat(registry.getCoder(SerializableRecord.class),
         instanceOf(SerializableCoder.class));
-    assertThat(registry.getDefaultCoder(CustomRecord.class),
+    assertThat(registry.getCoder(CustomRecord.class),
         instanceOf(CustomSerializableCoder.class));
-    assertThat(registry.getDefaultCoder(OldCustomRecord.class),
+    assertThat(registry.getCoder(OldCustomRecord.class),
         instanceOf(OldCustomSerializableCoder.class));
   }
 
   @Test
   public void testDefaultCoderInCollection() throws Exception {
-    assertThat(registry.getDefaultCoder(new TypeDescriptor<List<AvroRecord>>(){}),
-        instanceOf(ListCoder.class));
-    assertThat(registry.getDefaultCoder(new TypeDescriptor<List<SerializableRecord>>(){}),
-        equalTo((Coder<List<SerializableRecord>>)
+    CoderRegistry registry = CoderRegistry.createDefault();
+    registry.registerCoderProvider(
+        new DefaultCoderProvider());
+    Coder<List<AvroRecord>> avroRecordCoder =
+        registry.getCoder(new TypeDescriptor<List<AvroRecord>>(){});
+    assertThat(avroRecordCoder, instanceOf(ListCoder.class));
+    assertThat(((ListCoder) avroRecordCoder).getElemCoder(), instanceOf(AvroCoder.class));
+    assertThat(registry.getCoder(new TypeDescriptor<List<SerializableRecord>>(){}),
+        Matchers.<Coder<List<SerializableRecord>>>equalTo(
             ListCoder.of(SerializableCoder.of(SerializableRecord.class))));
   }
 
   @Test
   public void testUnknown() throws Exception {
     thrown.expect(CannotProvideCoderException.class);
-    registry.getDefaultCoder(Unknown.class);
+    new DefaultCoderProvider().coderFor(
+        TypeDescriptor.of(Unknown.class), Collections.<Coder<?>>emptyList());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index 1e56135..1141fb5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.coders;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.util.Arrays;
@@ -62,21 +60,6 @@ public class IterableCoderTest {
   }
 
   @Test
-  public void testGetInstanceComponentsNonempty() {
-    Iterable<Integer> iterable = Arrays.asList(2, 58, 99, 5);
-    List<Object> components = IterableCoder.getInstanceComponents(iterable);
-    assertEquals(1, components.size());
-    assertEquals(2, components.get(0));
-  }
-
-  @Test
-  public void testGetInstanceComponentsEmpty() {
-    Iterable<Integer> iterable = Arrays.asList();
-    List<Object> components = IterableCoder.getInstanceComponents(iterable);
-    assertNull(components);
-  }
-
-  @Test
   public void testCoderSerializable() throws Exception {
     CoderProperties.coderSerializable(TEST_CODER);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
index 35239d6..bcc4400 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.coders;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.util.Arrays;
@@ -62,21 +60,6 @@ public class ListCoderTest {
   }
 
   @Test
-  public void testGetInstanceComponentsNonempty() throws Exception {
-    List<Integer> list = Arrays.asList(21, 5, 3, 5);
-    List<Object> components = ListCoder.getInstanceComponents(list);
-    assertEquals(1, components.size());
-    assertEquals(21, components.get(0));
-  }
-
-  @Test
-  public void testGetInstanceComponentsEmpty() throws Exception {
-    List<Integer> list = Arrays.asList();
-    List<Object> components = ListCoder.getInstanceComponents(list);
-    assertNull(components);
-  }
-
-  @Test
   public void testEmptyList() throws Exception {
     List<Integer> list = Collections.emptyList();
     Coder<List<Integer>> coder = ListCoder.of(VarIntCoder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
index a52e6cb..19c895e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
@@ -18,14 +18,11 @@
 package org.apache.beam.sdk.coders;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -64,23 +61,6 @@ public class MapCoderTest {
         MapCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
   }
 
-  @Test
-  public void testGetInstanceComponentsNonempty() {
-    Map<Integer, String> map = new HashMap<>();
-    map.put(17, "foozle");
-    List<Object> components = MapCoder.getInstanceComponents(map);
-    assertEquals(2, components.size());
-    assertEquals(17, components.get(0));
-    assertEquals("foozle", components.get(1));
-  }
-
-  @Test
-  public void testGetInstanceComponentsEmpty() {
-    Map<Integer, String> map = new HashMap<>();
-    List<Object> components = MapCoder.getInstanceComponents(map);
-    assertNull(components);
-  }
-
   /**
    * Generated data to check that the wire format has not changed. To regenerate, see
    * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index ef6df34..d97eea6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -230,4 +231,14 @@ public class SerializableCoderTest implements Serializable {
         SerializableCoder.of(MyRecord.class).getEncodedTypeDescriptor(),
         Matchers.equalTo(TypeDescriptor.of(MyRecord.class)));
   }
+
+  private static class AutoRegistration implements Serializable {
+    private static final long serialVersionUID = 42L;
+  }
+
+  @Test
+  public void testSerializableCoderProviderIsRegistered() throws Exception {
+    assertThat(CoderRegistry.createDefault().getCoder(AutoRegistration.class),
+        instanceOf(SerializableCoder.class));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 62edac9..8a4d563 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -104,7 +104,7 @@ public class  CombineFnsTest {
   @Test
   @Category(ValidatesRunner.class)
   public void testComposedCombine() {
-    p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
+    p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
 
     PCollection<KV<String, KV<Integer, UserString>>> perKeyInput = p.apply(
         Create.timestamped(
@@ -156,7 +156,7 @@ public class  CombineFnsTest {
   @Test
   @Category(ValidatesRunner.class)
   public void testComposedCombineWithContext() {
-    p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
+    p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
 
     PCollectionView<String> view = p
         .apply(Create.of("I"))
@@ -218,8 +218,10 @@ public class  CombineFnsTest {
   @Test
   @Category(ValidatesRunner.class)
   public void testComposedCombineNullValues() {
-    p.getCoderRegistry().registerCoder(UserString.class, NullableCoder.of(UserStringCoder.of()));
-    p.getCoderRegistry().registerCoder(String.class, NullableCoder.of(StringUtf8Coder.of()));
+    p.getCoderRegistry().registerCoderForClass(
+        UserString.class, NullableCoder.of(UserStringCoder.of()));
+    p.getCoderRegistry().registerCoderForClass(
+        String.class, NullableCoder.of(StringUtf8Coder.of()));
 
     PCollection<KV<String, KV<Integer, UserString>>> perKeyInput = p.apply(
         Create.timestamped(

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 76f61b3..a458812 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -321,7 +321,7 @@ public class CreateTest {
   @Test
   public void testCreateTimestampedDefaultOutputCoderUsingTypeDescriptor() throws Exception {
     Coder<Record> coder = new RecordCoder();
-    p.getCoderRegistry().registerCoder(Record.class, coder);
+    p.getCoderRegistry().registerCoderForClass(Record.class, coder);
     PBegin pBegin = PBegin.in(p);
     Create.TimestampedValues<Record> values =
         Create.timestamped(
@@ -364,7 +364,7 @@ public class CreateTest {
   @Test
   public void testCreateDefaultOutputCoderUsingInference() throws Exception {
     Coder<Record> coder = new RecordCoder();
-    p.getCoderRegistry().registerCoder(Record.class, coder);
+    p.getCoderRegistry().registerCoderForClass(Record.class, coder);
     PBegin pBegin = PBegin.in(p);
     Create.Values<Record> values = Create.of(new Record(), new Record(), new Record());
     Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
@@ -384,7 +384,7 @@ public class CreateTest {
   @Test
   public void testCreateDefaultOutputCoderUsingTypeDescriptor() throws Exception {
     Coder<Record> coder = new RecordCoder();
-    p.getCoderRegistry().registerCoder(Record.class, coder);
+    p.getCoderRegistry().registerCoderForClass(Record.class, coder);
     PBegin pBegin = PBegin.in(p);
     Create.Values<Record> values =
         Create.of(new Record(), new Record2()).withType(new TypeDescriptor<Record>() {});

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index b24071e..11f284f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -94,8 +94,8 @@ public class FlatMapElementsTest implements Serializable {
 
     assertThat(output.getTypeDescriptor(),
         equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {}));
-    assertThat(pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()),
-        equalTo(pipeline.getCoderRegistry().getDefaultCoder(new TypeDescriptor<String>() {})));
+    assertThat(pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()),
+        equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {})));
 
     // Make sure the pipeline runs
     pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 6982e01..aba33eb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -392,7 +393,8 @@ public class GroupByKeyTest {
     final int numValues = 10;
     final int numKeys = 5;
 
-    p.getCoderRegistry().registerCoder(BadEqualityKey.class, DeterministicKeyCoder.class);
+    p.getCoderRegistry().registerCoderProvider(
+        CoderProviders.fromStaticMethods(BadEqualityKey.class, DeterministicKeyCoder.class));
 
     // construct input data
     List<KV<BadEqualityKey, Long>> input = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 7bf94a0..241b60e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -187,8 +187,8 @@ public class MapElementsTest implements Serializable {
         }));
     assertThat(output.getTypeDescriptor(),
         equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {}));
-    assertThat(pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()),
-        equalTo(pipeline.getCoderRegistry().getDefaultCoder(new TypeDescriptor<String>() {})));
+    assertThat(pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()),
+        equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {})));
 
     // Make sure the pipeline runs too
     pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 073957f..cbbe7f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -985,11 +985,6 @@ public class ParDoTest implements Serializable {
       return INSTANCE;
     }
 
-    @SuppressWarnings("unused") // used to create a CoderFactory
-    public static List<Object> getInstanceComponents(TestDummy exampleValue) {
-      return Collections.emptyList();
-    }
-
     @Override
     public void encode(TestDummy value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -1657,7 +1652,7 @@ public class ParDoTest implements Serializable {
   public void testValueStateCoderInference() {
     final String stateId = "foo";
     MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, Integer>, MyInteger> fn =
         new DoFn<KV<String, Integer>, MyInteger>() {
@@ -1750,7 +1745,7 @@ public class ParDoTest implements Serializable {
   public void testCoderInferenceOfList() {
     final String stateId = "foo";
     MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, Integer>, List<MyInteger>> fn =
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
@@ -1966,7 +1961,7 @@ public class ParDoTest implements Serializable {
   public void testBagStateCoderInference() {
     final String stateId = "foo";
     Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, Integer>, List<MyInteger>> fn =
         new DoFn<KV<String, Integer>, List<MyInteger>>() {
@@ -2085,7 +2080,7 @@ public class ParDoTest implements Serializable {
     final String stateId = "foo";
     final String countStateId = "count";
     Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, Integer>, Set<MyInteger>> fn =
         new DoFn<KV<String, Integer>, Set<MyInteger>>() {
@@ -2217,7 +2212,7 @@ public class ParDoTest implements Serializable {
     final String stateId = "foo";
     final String countStateId = "count";
     Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
 
     DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn =
         new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
@@ -2345,7 +2340,7 @@ public class ParDoTest implements Serializable {
   @Test
   @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testCombiningStateCoderInference() {
-    pipeline.getCoderRegistry().registerCoder(MyInteger.class, MyIntegerCoder.of());
+    pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, MyIntegerCoder.of());
 
     final String stateId = "foo";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index f26dd59..489493a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.state.StateSpec;
@@ -422,7 +423,8 @@ public class DoFnInvokersTest {
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
 
     CoderRegistry coderRegistry = CoderRegistry.createDefault();
-    coderRegistry.registerCoder(RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class);
+    coderRegistry.registerCoderProvider(CoderProviders.fromStaticMethods(
+        RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class));
     assertThat(
         invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
         instanceOf(CoderForDefaultTracker.class));

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index f752b1c..e1eb2fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -77,14 +78,10 @@ public class TypedPValueTest {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
+    thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
 
-    tuple.get(untypedOutputTag).getCoder();
+    Coder<?> coder = tuple.get(untypedOutputTag).getCoder();
+    System.out.println(coder);
   }
 
   @Test
@@ -96,12 +93,7 @@ public class TypedPValueTest {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
+    thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
 
     tuple.get(untypedOutputTag).getCoder();
   }
@@ -126,7 +118,10 @@ public class TypedPValueTest {
     assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class));
   }
 
-  // A simple class for which there should be no obvious Coder.
+  /**
+   * This type is incompatible with all known coder providers such as Serializable,
+   * {@code @DefaultCoder} which allows testing coder registry lookup failure cases.
+   */
   static class EmptyClass {
   }
 
@@ -149,10 +144,7 @@ public class TypedPValueTest {
     thrown.expectMessage(not(containsString("erasure")));
     thrown.expectMessage(not(containsString("see TupleTag Javadoc")));
     // Instead, expect output suggesting other possible fixes.
-    thrown.expectMessage(containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(containsString("Building a Coder from the fallback CoderProvider failed"));
+    thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
 
     input.getCoder();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 9577c6e..968a2fa 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -31,6 +31,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
@@ -41,6 +42,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderProvider;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -105,13 +107,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  */
 public class ProtoCoder<T extends Message> extends CustomCoder<T> {
 
-  /**
-   * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
-   * {@link ExtensionRegistry}.
-   */
-  public static CoderProvider coderProvider() {
-    return PROVIDER;
-  }
 
   /**
    * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
@@ -291,36 +286,47 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
     return memoizedParser;
   }
 
-  static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
+  /**
+   * Returns a {@link CoderProvider} which uses the {@link ProtoCoder} for
+   * {@link Message proto messages}.
+   *
+   * <p>This method is invoked reflectively from {@link DefaultCoder}.
+   */
+  public static CoderProvider getCoderProvider() {
+    return new ProtoCoderProvider();
+  }
+
+  static final TypeDescriptor<Message> MESSAGE_TYPE = new TypeDescriptor<Message>() {};
 
   /**
-   * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
-   * {@link #coderProvider()}.
+   * A {@link CoderProvider} for {@link Message proto messages}.
    */
-  private static final CoderProvider PROVIDER =
-      new CoderProvider() {
-        @Override
-        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-          if (!type.isSubtypeOf(CHECK)) {
-            throw new CannotProvideCoderException(
-                String.format(
-                    "Cannot provide %s because %s is not a subclass of %s",
-                    ProtoCoder.class.getSimpleName(),
-                    type,
-                    Message.class.getName()));
-          }
+  private static class ProtoCoderProvider extends CoderProvider {
 
-          @SuppressWarnings("unchecked")
-          TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
-          try {
-            @SuppressWarnings("unchecked")
-            Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
-            return coder;
-          } catch (IllegalArgumentException e) {
-            throw new CannotProvideCoderException(e);
-          }
-        }
-      };
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      if (!typeDescriptor.isSubtypeOf(MESSAGE_TYPE)) {
+        throw new CannotProvideCoderException(
+            String.format(
+                "Cannot provide %s because %s is not a subclass of %s",
+                ProtoCoder.class.getSimpleName(),
+                typeDescriptor,
+                Message.class.getName()));
+      }
+
+      @SuppressWarnings("unchecked")
+      TypeDescriptor<? extends Message> messageType =
+          (TypeDescriptor<? extends Message>) typeDescriptor;
+      try {
+        @SuppressWarnings("unchecked")
+        Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
+        return coder;
+      } catch (IllegalArgumentException e) {
+        throw new CannotProvideCoderException(e);
+      }
+    }
+  }
 
   private SortedSet<String> getSortedExtensionClasses() {
     SortedSet<String> ret = new TreeSet<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
new file mode 100644
index 0000000..1cb79d7
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with Google Protobuf.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class ProtobufCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(ByteString.class), ByteStringCoder.of()),
+        ProtoCoder.getCoderProvider());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
deleted file mode 100644
index fb39a14..0000000
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.protobuf;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/**
- * A {@link CoderRegistrar} for standard types used with Google Protobuf.
- */
-@AutoService(CoderRegistrar.class)
-public class ProtobufCoderRegistrar implements CoderRegistrar {
-  @Override
-  public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
-    return ImmutableMap.<Class<?>, CoderFactory>of(
-        ByteString.class, CoderFactories.forCoder(ByteStringCoder.of()));
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
index 6d00b86..d79bf4c 100644
--- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
+++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
@@ -53,7 +54,8 @@ public class ProtoCoderTest {
 
     assertEquals(
         ProtoCoder.of(new TypeDescriptor<MessageA>() {}),
-        ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() {}));
+        ProtoCoder.getCoderProvider().coderFor(
+            new TypeDescriptor<MessageA>() {}, Collections.<Coder<?>>emptyList()));
   }
 
   @Test
@@ -61,7 +63,8 @@ public class ProtoCoderTest {
     thrown.expect(CannotProvideCoderException.class);
     thrown.expectMessage("java.lang.Integer is not a subclass of com.google.protobuf.Message");
 
-    ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {});
+    ProtoCoder.getCoderProvider().coderFor(
+        new TypeDescriptor<Integer>() {}, Collections.<Coder<?>>emptyList());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
new file mode 100644
index 0000000..c6fd849
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with {@link BigQueryIO}.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class BigQueryCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(TableRow.class), TableRowJsonCoder.of()),
+        CoderProviders.forCoder(TypeDescriptor.of(TableRowInfo.class), TableRowInfoCoder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
deleted file mode 100644
index 847c7b5..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/**
- * A {@link CoderRegistrar} for standard types used with {@link BigQueryIO}.
- */
-@AutoService(CoderRegistrar.class)
-public class BigQueryCoderRegistrar implements CoderRegistrar {
-  @Override
-  public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
-    return ImmutableMap.of(
-        TableRow.class, CoderFactories.forCoder(TableRowJsonCoder.of()),
-        TableRowInfo.class, CoderFactories.forCoder(TableRowInfoCoder.of()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index dc8bcff..edb1e0d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -168,7 +168,7 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
           Type parameter = parameterized.getActualTypeArguments()[1];
           @SuppressWarnings("unchecked")
           Class<DestinationT> parameterClass = (Class<DestinationT>) parameter;
-          return registry.getDefaultCoder(parameterClass);
+          return registry.getCoder(parameterClass);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
new file mode 100644
index 0000000..57bc903
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link CoderProviderRegistrar} for standard types used with {@link PubsubIO}. */
+@AutoService(CoderProviderRegistrar.class)
+public class PubsubCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class),
+            PubsubMessageWithAttributesCoder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
deleted file mode 100644
index 062f350..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/** A {@link CoderRegistrar} for standard types used with {@link PubsubIO}. */
-@AutoService(CoderRegistrar.class)
-public class PubsubCoderRegistrar implements CoderRegistrar {
-  @Override
-  public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
-    return ImmutableMap.<Class<?>, CoderFactory>of(
-        PubsubMessage.class, CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
new file mode 100644
index 0000000..bd5246a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link BigQueryCoderProviderRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryCoderProviderRegistrarTest {
+  @Test
+  public void testTableRowCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getCoder(TableRow.class);
+  }
+
+  @Test
+  public void testTableRowInfoCoderIsRegistered() throws Exception {
+    CoderRegistry.createDefault().getCoder(TableRowInfo.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
deleted file mode 100644
index e7e9fe1..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableRow;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link BigQueryCoderRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryCoderRegistrarTest {
-  @Test
-  public void testTableRowCoderIsRegistered() throws Exception {
-    CoderRegistry.createDefault().getDefaultCoder(TableRow.class);
-  }
-
-  @Test
-  public void testTableRowInfoCoderIsRegistered() throws Exception {
-    CoderRegistry.createDefault().getDefaultCoder(TableRowInfo.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index ebb4b39..9054f99 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -77,5 +77,12 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 15877b0..8fddfe0 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.hadoop;
 
+import com.google.auto.service.AutoService;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -25,9 +26,14 @@ import java.io.OutputStream;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
 import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -110,4 +116,54 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> {
     return type.hashCode();
   }
 
+  /**
+   * Returns a {@link CoderProvider} which uses the {@link WritableCoder} for Hadoop
+   * {@link Writable writable types}.
+   *
+   * <p>This method is invoked reflectively from {@link DefaultCoder}.
+   */
+  public static CoderProvider getCoderProvider() {
+    return new WritableCoderProvider();
+  }
+
+  /**
+   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
+   * {@link Writable writable types}.
+   */
+  @AutoService(CoderProviderRegistrar.class)
+  public static class WritableCoderProviderRegistrar implements CoderProviderRegistrar {
+
+    @Override
+    public List<CoderProvider> getCoderProviders() {
+      return Collections.singletonList(getCoderProvider());
+    }
+  }
+
+  /**
+   * A {@link CoderProvider} for Hadoop {@link Writable writable types}.
+   */
+  private static class WritableCoderProvider extends CoderProvider {
+    private static final TypeDescriptor<Writable> WRITABLE_TYPE = new TypeDescriptor<Writable>() {};
+
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      if (!typeDescriptor.isSubtypeOf(WRITABLE_TYPE)) {
+        throw new CannotProvideCoderException(
+            String.format(
+                "Cannot provide %s because %s does not implement the interface %s",
+                WritableCoder.class.getSimpleName(),
+                typeDescriptor,
+                Writable.class.getName()));
+      }
+
+      try {
+        @SuppressWarnings("unchecked")
+        Coder<T> coder = WritableCoder.of((Class) typeDescriptor.getRawType());
+        return coder;
+      } catch (IllegalArgumentException e) {
+        throw new CannotProvideCoderException(e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
index 8127773..9f2a54d 100644
--- a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
+++ b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.sdk.io.hadoop;
 
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -42,4 +46,10 @@ public class WritableCoderTest {
 
     CoderProperties.coderDecodeEncodeEqual(coder, value);
   }
+
+  @Test
+  public void testAutomaticRegistrationOfCoderProvider() throws Exception {
+    assertThat(CoderRegistry.createDefault().getCoder(NullWritable.class),
+        instanceOf(WritableCoder.class));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e676455..ba84c2a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -304,7 +304,7 @@ public class KafkaIO {
         Class<T> clazz = (Class<T>) parameter;
 
         try {
-          return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
+          return NullableCoder.of(coderRegistry.getCoder(clazz));
         } catch (CannotProvideCoderException e) {
           throw new RuntimeException(
                   String.format("Unable to automatically infer a Coder for "

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
index 790f51e..61df23b 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
@@ -89,13 +88,6 @@ public class DistinctJava8Test {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");
-    thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
 
     // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is
     // implemented with

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
index a3481e1..b38250e 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
@@ -91,7 +91,7 @@ public class FilterJava8Test implements Serializable {
         .apply(Filter.by(s -> true));
 
     thrown.expect(CannotProvideCoderException.class);
-    pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor());
+    pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
index 7d97740..94353a5 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
@@ -69,6 +69,6 @@ public class PartitionJava8Test implements Serializable {
         .apply(Partition.of(1, (element, numPartitions) -> 0));
 
     thrown.expect(CannotProvideCoderException.class);
-    pipeline.getCoderRegistry().getDefaultCoder(output.get(0).getTypeDescriptor());
+    pipeline.getCoderRegistry().getCoder(output.get(0).getTypeDescriptor());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index 4f0361e..260dd24 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.hamcrest.Matchers.containsString;
-
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -69,13 +67,6 @@ public class WithKeysJava8Test {
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys");
-    thrown.expectMessage("No Coder has been manually specified");
-    thrown.expectMessage(
-        containsString("Building a Coder using a registered CoderFactory failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the @DefaultCoder annotation failed"));
-    thrown.expectMessage(
-        containsString("Building a Coder from the fallback CoderProvider failed"));
 
     p.run();
   }


[4/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types

Posted by lc...@apache.org.
[BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types

This closes #2910


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/947fa68a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/947fa68a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/947fa68a

Branch: refs/heads/master
Commit: 947fa68a53068a2eb0ce9955685f7a46cc543192
Parents: 6505988 f8e2cf8
Author: Luke Cwik <lc...@google.com>
Authored: Fri May 5 14:45:03 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri May 5 14:45:03 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/complete/TfIdf.java    |   2 +-
 .../beam/examples/complete/TfIdfTest.java       |   2 +-
 .../beam/runners/core/SplittableParDo.java      |   4 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |   2 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  10 -
 .../org/apache/beam/sdk/coders/AtomicCoder.java |  14 -
 .../org/apache/beam/sdk/coders/AvroCoder.java   |  42 +-
 .../apache/beam/sdk/coders/ByteArrayCoder.java  |   8 -
 .../sdk/coders/CannotProvideCoderException.java |   2 +-
 .../java/org/apache/beam/sdk/coders/Coder.java  |   4 +-
 .../apache/beam/sdk/coders/CoderFactories.java  | 292 ---------
 .../apache/beam/sdk/coders/CoderFactory.java    |  44 --
 .../apache/beam/sdk/coders/CoderProvider.java   |  19 +-
 .../beam/sdk/coders/CoderProviderRegistrar.java |  42 ++
 .../apache/beam/sdk/coders/CoderProviders.java  | 240 +++----
 .../apache/beam/sdk/coders/CoderRegistrar.java  |  45 --
 .../apache/beam/sdk/coders/CoderRegistry.java   | 618 +++++++------------
 .../apache/beam/sdk/coders/CollectionCoder.java |   9 -
 .../org/apache/beam/sdk/coders/CustomCoder.java |   8 -
 .../apache/beam/sdk/coders/DefaultCoder.java    | 119 +++-
 .../apache/beam/sdk/coders/IterableCoder.java   |   9 -
 .../beam/sdk/coders/IterableLikeCoder.java      |  12 -
 .../org/apache/beam/sdk/coders/KvCoder.java     |   7 -
 .../org/apache/beam/sdk/coders/ListCoder.java   |   8 -
 .../org/apache/beam/sdk/coders/MapCoder.java    |  12 -
 .../beam/sdk/coders/SerializableCoder.java      |  57 +-
 .../org/apache/beam/sdk/coders/SetCoder.java    |   9 -
 .../apache/beam/sdk/coders/VarLongCoder.java    |   7 -
 .../beam/sdk/transforms/CombineFnBase.java      |   4 +-
 .../apache/beam/sdk/transforms/CombineFns.java  |   4 +-
 .../org/apache/beam/sdk/transforms/Create.java  |  76 ++-
 .../apache/beam/sdk/transforms/PTransform.java  |   4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   8 +-
 .../apache/beam/sdk/transforms/WithKeys.java    |   4 +-
 .../sdk/transforms/windowing/GlobalWindow.java  |   6 -
 .../transforms/windowing/IntervalWindow.java    |   7 -
 .../org/apache/beam/sdk/values/PCollection.java |   2 +-
 .../beam/sdk/values/TimestampedValue.java       |   4 -
 .../beam/sdk/coders/CoderFactoriesTest.java     | 100 ---
 .../beam/sdk/coders/CoderProvidersTest.java     |  82 ++-
 .../beam/sdk/coders/CoderRegistryTest.java      | 167 ++---
 .../beam/sdk/coders/DefaultCoderTest.java       |  65 +-
 .../beam/sdk/coders/IterableCoderTest.java      |  17 -
 .../apache/beam/sdk/coders/ListCoderTest.java   |  17 -
 .../apache/beam/sdk/coders/MapCoderTest.java    |  20 -
 .../beam/sdk/coders/SerializableCoderTest.java  |  11 +
 .../beam/sdk/transforms/CombineFnsTest.java     |  10 +-
 .../apache/beam/sdk/transforms/CreateTest.java  |   6 +-
 .../sdk/transforms/FlatMapElementsTest.java     |   4 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |   4 +-
 .../beam/sdk/transforms/MapElementsTest.java    |   4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  17 +-
 .../transforms/reflect/DoFnInvokersTest.java    |   4 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  28 +-
 .../sdk/extensions/protobuf/ProtoCoder.java     |  72 ++-
 .../ProtobufCoderProviderRegistrar.java         |  41 ++
 .../protobuf/ProtobufCoderRegistrar.java        |  39 --
 .../sdk/extensions/protobuf/ProtoCoderTest.java |   7 +-
 .../BigQueryCoderProviderRegistrar.java         |  40 ++
 .../io/gcp/bigquery/BigQueryCoderRegistrar.java |  39 --
 .../io/gcp/bigquery/DynamicDestinations.java    |   2 +-
 .../pubsub/PubsubCoderProviderRegistrar.java    |  37 ++
 .../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java |  35 --
 .../BigQueryCoderProviderRegistrarTest.java     |  40 ++
 .../bigquery/BigQueryCoderRegistrarTest.java    |  40 --
 sdks/java/io/hadoop-common/pom.xml              |   7 +
 .../beam/sdk/io/hadoop/WritableCoder.java       |  56 ++
 .../beam/sdk/io/hadoop/WritableCoderTest.java   |  10 +
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |   2 +-
 .../beam/sdk/transforms/DistinctJava8Test.java  |   8 -
 .../beam/sdk/transforms/FilterJava8Test.java    |   2 +-
 .../beam/sdk/transforms/PartitionJava8Test.java |   2 +-
 .../beam/sdk/transforms/WithKeysJava8Test.java  |   9 -
 73 files changed, 1121 insertions(+), 1689 deletions(-)
----------------------------------------------------------------------



[3/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types

Posted by lc...@apache.org.
[BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8e2cf89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8e2cf89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8e2cf89

Branch: refs/heads/master
Commit: f8e2cf89febeb2f86d2dc91c8d3fff5d43df3623
Parents: 6505988
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 19:55:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri May 5 14:44:32 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/complete/TfIdf.java    |   2 +-
 .../beam/examples/complete/TfIdfTest.java       |   2 +-
 .../beam/runners/core/SplittableParDo.java      |   4 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |   2 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  10 -
 .../org/apache/beam/sdk/coders/AtomicCoder.java |  14 -
 .../org/apache/beam/sdk/coders/AvroCoder.java   |  42 +-
 .../apache/beam/sdk/coders/ByteArrayCoder.java  |   8 -
 .../sdk/coders/CannotProvideCoderException.java |   2 +-
 .../java/org/apache/beam/sdk/coders/Coder.java  |   4 +-
 .../apache/beam/sdk/coders/CoderFactories.java  | 292 ---------
 .../apache/beam/sdk/coders/CoderFactory.java    |  44 --
 .../apache/beam/sdk/coders/CoderProvider.java   |  19 +-
 .../beam/sdk/coders/CoderProviderRegistrar.java |  42 ++
 .../apache/beam/sdk/coders/CoderProviders.java  | 240 +++----
 .../apache/beam/sdk/coders/CoderRegistrar.java  |  45 --
 .../apache/beam/sdk/coders/CoderRegistry.java   | 618 +++++++------------
 .../apache/beam/sdk/coders/CollectionCoder.java |   9 -
 .../org/apache/beam/sdk/coders/CustomCoder.java |   8 -
 .../apache/beam/sdk/coders/DefaultCoder.java    | 119 +++-
 .../apache/beam/sdk/coders/IterableCoder.java   |   9 -
 .../beam/sdk/coders/IterableLikeCoder.java      |  12 -
 .../org/apache/beam/sdk/coders/KvCoder.java     |   7 -
 .../org/apache/beam/sdk/coders/ListCoder.java   |   8 -
 .../org/apache/beam/sdk/coders/MapCoder.java    |  12 -
 .../beam/sdk/coders/SerializableCoder.java      |  57 +-
 .../org/apache/beam/sdk/coders/SetCoder.java    |   9 -
 .../apache/beam/sdk/coders/VarLongCoder.java    |   7 -
 .../beam/sdk/transforms/CombineFnBase.java      |   4 +-
 .../apache/beam/sdk/transforms/CombineFns.java  |   4 +-
 .../org/apache/beam/sdk/transforms/Create.java  |  76 ++-
 .../apache/beam/sdk/transforms/PTransform.java  |   4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   8 +-
 .../apache/beam/sdk/transforms/WithKeys.java    |   4 +-
 .../sdk/transforms/windowing/GlobalWindow.java  |   6 -
 .../transforms/windowing/IntervalWindow.java    |   7 -
 .../org/apache/beam/sdk/values/PCollection.java |   2 +-
 .../beam/sdk/values/TimestampedValue.java       |   4 -
 .../beam/sdk/coders/CoderFactoriesTest.java     | 100 ---
 .../beam/sdk/coders/CoderProvidersTest.java     |  82 ++-
 .../beam/sdk/coders/CoderRegistryTest.java      | 167 ++---
 .../beam/sdk/coders/DefaultCoderTest.java       |  65 +-
 .../beam/sdk/coders/IterableCoderTest.java      |  17 -
 .../apache/beam/sdk/coders/ListCoderTest.java   |  17 -
 .../apache/beam/sdk/coders/MapCoderTest.java    |  20 -
 .../beam/sdk/coders/SerializableCoderTest.java  |  11 +
 .../beam/sdk/transforms/CombineFnsTest.java     |  10 +-
 .../apache/beam/sdk/transforms/CreateTest.java  |   6 +-
 .../sdk/transforms/FlatMapElementsTest.java     |   4 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |   4 +-
 .../beam/sdk/transforms/MapElementsTest.java    |   4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  17 +-
 .../transforms/reflect/DoFnInvokersTest.java    |   4 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  28 +-
 .../sdk/extensions/protobuf/ProtoCoder.java     |  72 ++-
 .../ProtobufCoderProviderRegistrar.java         |  41 ++
 .../protobuf/ProtobufCoderRegistrar.java        |  39 --
 .../sdk/extensions/protobuf/ProtoCoderTest.java |   7 +-
 .../BigQueryCoderProviderRegistrar.java         |  40 ++
 .../io/gcp/bigquery/BigQueryCoderRegistrar.java |  39 --
 .../io/gcp/bigquery/DynamicDestinations.java    |   2 +-
 .../pubsub/PubsubCoderProviderRegistrar.java    |  37 ++
 .../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java |  35 --
 .../BigQueryCoderProviderRegistrarTest.java     |  40 ++
 .../bigquery/BigQueryCoderRegistrarTest.java    |  40 --
 sdks/java/io/hadoop-common/pom.xml              |   7 +
 .../beam/sdk/io/hadoop/WritableCoder.java       |  56 ++
 .../beam/sdk/io/hadoop/WritableCoderTest.java   |  10 +
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |   2 +-
 .../beam/sdk/transforms/DistinctJava8Test.java  |   8 -
 .../beam/sdk/transforms/FilterJava8Test.java    |   2 +-
 .../beam/sdk/transforms/PartitionJava8Test.java |   2 +-
 .../beam/sdk/transforms/WithKeysJava8Test.java  |   9 -
 73 files changed, 1121 insertions(+), 1689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 6fd9755..7552b94 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -409,7 +409,7 @@ public class TfIdf {
   public static void main(String[] args) throws Exception {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     Pipeline pipeline = Pipeline.create(options);
-    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+    pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
 
     pipeline
         .apply(new ReadDocuments(listInputDocuments(options)))

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
index d263643..3681ff5 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
@@ -48,7 +48,7 @@ public class TfIdfTest {
   @Category(ValidatesRunner.class)
   public void testTfIdf() throws Exception {
 
-    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+    pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
 
     PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
         .apply(Create.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index a4d9639..6503fa2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -274,7 +274,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             input,
         PCollection<T> output)
         throws CannotProvideCoderException {
-      // Similar logic to ParDo.MultiOutput.getDefaultOutputCoder.
+      // Similar logic to ParDo.MultiOutput.getOutputCoder.
       @SuppressWarnings("unchecked")
       KeyedWorkItemCoder<String, ElementAndRestriction<InputT, RestrictionT>> kwiCoder =
           (KeyedWorkItemCoder) input.getCoder();
@@ -284,7 +284,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       return input
           .getPipeline()
           .getCoderRegistry()
-          .getDefaultCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
+          .getCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 46f26a1..1e60ca3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -229,7 +229,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CoderRegistry reg = pipeline.getCoderRegistry();
     StateTag<CombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
-            sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
+            sumLongFn.getAccumulatorCoder(reg, reg.getCoder(Long.class)), sumLongFn);
     GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(0L));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 0fe9585..85e55eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -577,14 +577,4 @@ public class DirectRunnerTest implements Serializable {
       return underlying.getDefaultOutputCoder();
     }
   }
-
-  @Test
-  public void fallbackCoderProviderAllowsInference() {
-    // See https://issues.apache.org/jira/browse/BEAM-1642
-    Pipeline p = getPipeline();
-    p.getCoderRegistry().setFallbackCoderProvider(
-        org.apache.beam.sdk.coders.AvroCoder.PROVIDER);
-    p.apply(Create.of(Arrays.asList(100, 200))).apply(Count.<Integer>globally());
-    p.run().waitUntilFinish();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 528cfb0..043fe93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -33,20 +33,6 @@ import java.util.List;
  */
 public abstract class AtomicCoder<T> extends StructuredCoder<T> {
   /**
-   * Returns an empty list.
-   *
-   * <p>{@link CoderFactories#fromStaticMethods(Class)} builds a {@link CoderFactory} from the
-   * {@code #of()} method and this method, used to determine the components of an object. Because
-   * {@link AtomicCoder} has no components, always returns an empty list.
-   *
-   * @param exampleValue unused, but part of the latent interface expected by {@link
-   *     CoderFactories#fromStaticMethods}
-   */
-  public static <T> List<Object> getInstanceComponents(T exampleValue) {
-    return Collections.emptyList();
-  }
-
-  /**
    * {@inheritDoc}.
    *
    * @throws NonDeterministicException

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 2aa2b44..f82c065 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import javax.annotation.Nullable;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -140,18 +141,39 @@ public class AvroCoder<T> extends CustomCoder<T> {
     return new AvroCoder<>(type, schema);
   }
 
-  public static final CoderProvider PROVIDER = new CoderProvider() {
+  /**
+   * Returns a {@link CoderProvider} which uses the {@link AvroCoder} if possible for
+   * all types.
+   *
+   * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
+   * accept dangerous types such as {@link Object}.
+   *
+   * <p>This method is invoked reflectively from {@link DefaultCoder}.
+   */
+  @SuppressWarnings("unused")
+  public static CoderProvider getCoderProvider() {
+    return new AvroCoderProvider();
+  }
+
+  /**
+   * A {@link CoderProvider} that constructs an {@link AvroCoder} for Avro compatible classes.
+   *
+   * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
+   * accept dangerous types such as {@link Object}.
+   */
+  static class AvroCoderProvider extends CoderProvider {
     @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) {
-      // This is a downcast from `? super T` to T. However, because
-      // it comes from a TypeDescriptor<T>, the class object itself
-      // is the same so the supertype in question shares the same
-      // generated AvroCoder schema.
-      @SuppressWarnings("unchecked")
-      Class<T> rawType = (Class<T>) typeDescriptor.getRawType();
-      return AvroCoder.of(rawType);
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      try {
+        return AvroCoder.of(typeDescriptor);
+      } catch (AvroRuntimeException e) {
+        throw new CannotProvideCoderException(
+            String.format("%s is not compatible with Avro", typeDescriptor),
+            e);
+      }
     }
-  };
+  }
 
   private final Class<T> type;
   private final SerializableSchemaSupplier schemaSupplier;

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index 28cb627..d83d832 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -21,7 +21,6 @@ import com.google.common.io.ByteStreams;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
 import org.apache.beam.sdk.util.StreamUtils;
@@ -45,13 +44,6 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
     return INSTANCE;
   }
 
-  /**
-   * Returns an empty list. {@link ByteArrayCoder} has no components.
-   */
-  public static <T> List<Object> getInstanceComponents(T ignored) {
-    return Collections.emptyList();
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   private static final ByteArrayCoder INSTANCE = new ByteArrayCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
index c37ec00..bc2ef3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.coders;
 
 /**
- * The exception thrown when a {@link CoderProvider} cannot
+ * The exception thrown when a {@link CoderRegistry} or {@link CoderProvider} cannot
  * provide a {@link Coder} that has been requested.
  */
 public class CannotProvideCoderException extends Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 41e83ac..eeafbd2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -51,9 +51,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * <p>{@link Coder} classes for compound types are often composed from coder classes for types
  * contains therein. The composition of {@link Coder} instances into a coder for the compound
- * class is the subject of the {@link CoderFactory} type, which enables automatic generic
+ * class is the subject of the {@link CoderProvider} type, which enables automatic generic
  * composition of {@link Coder} classes within the {@link CoderRegistry}. With particular
- * static methods on a compound {@link Coder} class, a {@link CoderFactory} can be automatically
+ * static methods on a compound {@link Coder} class, a {@link CoderProvider} can be automatically
  * inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports
  * automatic composition in the {@link CoderRegistry}.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
deleted file mode 100644
index 4f05c95..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.MoreObjects;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * Static utility methods for creating and working with {@link Coder}s.
- */
-public final class CoderFactories {
-  private CoderFactories() { } // Static utility class
-
-  /**
-   * Creates a {@link CoderFactory} built from particular static methods of a class that
-   * implements {@link Coder}.
-   *
-   * <p>The class must have the following static methods:
-   *
-   * <ul>
-   * <li> {@code
-   * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
-   * }
-   * <li> {@code
-   * public static List<Object> getInstanceComponents(T exampleValue);
-   * }
-   * </ul>
-   *
-   * <p>The {@code of(...)} method will be used to construct a
-   * {@code Coder<T>} from component {@link Coder}s.
-   * It must accept one {@link Coder} argument for each
-   * generic type parameter of {@code T}. If {@code T} takes no generic
-   * type parameters, then the {@code of()} factory method should take
-   * no arguments.
-   *
-   * <p>The {@code getInstanceComponents} method will be used to
-   * decompose a value during the {@link Coder} inference process,
-   * to automatically choose coders for the components.
-   *
-   * <p>Note that the class {@code T} to be coded may be a
-   * not-yet-specialized generic class.
-   * For a generic class {@code MyClass<X>} and an actual type parameter
-   * {@code Foo}, the {@link CoderFactoryFromStaticMethods} will
-   * accept any {@code Coder<Foo>} and produce a {@code Coder<MyClass<Foo>>}.
-   *
-   * <p>For example, the {@link CoderFactory} returned by
-   * {@code fromStaticMethods(ListCoder.class)}
-   * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}.
-   */
-  public static <T> CoderFactory fromStaticMethods(Class<T> clazz) {
-    checkArgument(
-        Coder.class.isAssignableFrom(clazz),
-        "%s is not a subtype of %s",
-        clazz.getName(),
-        Coder.class.getSimpleName());
-    return new CoderFactoryFromStaticMethods((Class<? extends Coder>) clazz);
-  }
-
-  /**
-   * Creates a {@link CoderFactory} that always returns the
-   * given coder.
-   *
-   * <p>The {@code getInstanceComponents} method of this
-   * {@link CoderFactory} always returns an empty list.
-   */
-  public static <T> CoderFactory forCoder(Coder<T> coder) {
-    return new CoderFactoryForCoder<>(coder);
-  }
-
-  /**
-   * See {@link #fromStaticMethods} for a detailed description
-   * of the characteristics of this {@link CoderFactory}.
-   */
-  private static class CoderFactoryFromStaticMethods implements CoderFactory {
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
-      try {
-        return (Coder) factoryMethod.invoke(
-            null /* static */, componentCoders.toArray());
-      } catch (IllegalAccessException
-           | IllegalArgumentException
-           | InvocationTargetException
-           | NullPointerException
-           | ExceptionInInitializerError exn) {
-        throw new IllegalStateException(
-            "error when invoking Coder factory method " + factoryMethod,
-            exn);
-      }
-    }
-
-    @Override
-    public List<Object> getInstanceComponents(Object value) {
-      try {
-        @SuppressWarnings("unchecked")
-        List<Object> components =  (List<Object>) getComponentsMethod.invoke(
-            null /* static */, value);
-        return components;
-      } catch (IllegalAccessException
-          | IllegalArgumentException
-          | InvocationTargetException
-          | NullPointerException
-          | ExceptionInInitializerError exn) {
-        throw new IllegalStateException(
-            "error when invoking Coder getComponents method " + getComponentsMethod,
-            exn);
-      }
-    }
-
-    ////////////////////////////////////////////////////////////////////////////////
-
-    // Method to create a coder given component coders
-    // For a Coder class of kind * -> * -> ... n times ... -> *
-    // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
-    private final Method factoryMethod;
-
-    // Method to decompose a value of type T into its parts.
-    // For a Coder class of kind * -> * -> ... n times ... -> *
-    // this has type T -> List<Object>
-    // where the list has n elements.
-    private final Method getComponentsMethod;
-
-    /**
-     * Returns a CoderFactory that invokes the given static factory method
-     * to create the Coder.
-     */
-    private CoderFactoryFromStaticMethods(Class<? extends Coder> coderClazz) {
-      this.factoryMethod = getFactoryMethod(coderClazz);
-      this.getComponentsMethod = getInstanceComponentsMethod(coderClazz);
-    }
-
-    /**
-     * Returns the static {@code of} constructor method on {@code coderClazz}
-     * if it exists. It is assumed to have one {@link Coder} parameter for
-     * each type parameter of {@code coderClazz}.
-     */
-    private Method getFactoryMethod(Class<?> coderClazz) {
-      Method factoryMethodCandidate;
-
-      // Find the static factory method of coderClazz named 'of' with
-      // the appropriate number of type parameters.
-      int numTypeParameters = coderClazz.getTypeParameters().length;
-      Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
-      Arrays.fill(factoryMethodArgTypes, Coder.class);
-      try {
-        factoryMethodCandidate =
-            coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
-      } catch (NoSuchMethodException | SecurityException exn) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "does not have an accessible method named 'of' with "
-            + numTypeParameters + " arguments of Coder type",
-            exn);
-      }
-      if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type is not static");
-      }
-      if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type does not return a " + coderClazz);
-      }
-      try {
-        if (!factoryMethodCandidate.isAccessible()) {
-          factoryMethodCandidate.setAccessible(true);
-        }
-      } catch (SecurityException exn) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type is not accessible",
-            exn);
-      }
-
-      return factoryMethodCandidate;
-    }
-
-    /**
-     * Finds the static method on {@code coderType} to use
-     * to decompose a value of type {@code T} into components,
-     * each corresponding to an argument of the {@code of}
-     * method.
-     */
-    private <T, CoderT extends Coder> Method getInstanceComponentsMethod(Class<CoderT> coderClazz) {
-      TypeDescriptor<CoderT> coderType = TypeDescriptor.of(coderClazz);
-      TypeDescriptor<T> argumentType = getCodedType(coderType);
-
-      // getInstanceComponents may be implemented in a superclass,
-      // so we search them all for an applicable method. We do not
-      // try to be clever about finding the best overload. It may
-      // be in a generic superclass, erased to accept an Object.
-      // However, subtypes are listed before supertypes (it is a
-      // topological ordering) so probably the best one will be chosen
-      // if there are more than one (which should be rare)
-      for (TypeDescriptor<?> supertype : coderType.getClasses()) {
-        for (Method method : supertype.getRawType().getDeclaredMethods()) {
-          if (method.getName().equals("getInstanceComponents")) {
-            TypeDescriptor<?> formalArgumentType = supertype.getArgumentTypes(method).get(0);
-            if (formalArgumentType.getRawType().isAssignableFrom(argumentType.getRawType())) {
-              return method;
-            }
-          }
-        }
-      }
-
-      throw new IllegalArgumentException(
-          "cannot create a CoderFactory from " + coderType + ": "
-          + "does not have an accessible method "
-          + "'getInstanceComponents'");
-    }
-
-    /**
-     * If {@code coderType} is a subclass of {@link Coder} for a specific type {@code T}, returns
-     * {@code T.class}. Otherwise, raises IllegalArgumentException.
-     */
-    private <T, CoderT extends Coder> TypeDescriptor<T> getCodedType(
-        TypeDescriptor<CoderT> coderType) {
-      TypeDescriptor<? super CoderT> coderSupertype = coderType.getSupertype(Coder.class);
-      ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType();
-      @SuppressWarnings("unchecked")
-      TypeDescriptor<T> token =
-          (TypeDescriptor<T>) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]);
-      return token;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("factoryMethod", factoryMethod)
-          .add("getComponentsMethod", getComponentsMethod)
-          .toString();
-    }
-  }
-
-  /**
-   * See {@link #forCoder} for a detailed description of this
-   * {@link CoderFactory}.
-   */
-  private static class CoderFactoryForCoder<T> implements CoderFactory {
-    private final Coder<T> coder;
-
-    public CoderFactoryForCoder(Coder<T> coder) {
-      this.coder = coder;
-    }
-
-    @Override
-    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
-      return this.coder;
-    }
-
-    @Override
-    public List<Object> getInstanceComponents(Object value) {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("coder", coder)
-          .toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
deleted file mode 100644
index 22d03fa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import java.util.List;
-
-/**
- * A {@link CoderFactory} creates coders and decomposes values.
- * It may operate on a parameterized type, such as {@link List},
- * in which case the {@link #create} method accepts a list of
- * coders to use for the type parameters.
- */
-public interface CoderFactory {
-
-  /**
-   * Returns a {@code Coder<?>}, given argument coder to use for
-   * values of a particular type, given the Coders for each of
-   * the type's generic parameter types.
-   */
-  Coder<?> create(List<? extends Coder<?>> componentCoders);
-
-  /**
-   * Returns a list of objects contained in {@code value}, one per
-   * type argument, or {@code null} if none can be determined.
-   * The list of returned objects should be the same size as the
-   * list of coders required by {@link #create}.
-   */
-  List<Object> getInstanceComponents(Object value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
index 0db73eb..ac042ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
@@ -17,18 +17,25 @@
  */
 package org.apache.beam.sdk.coders;
 
+import java.util.List;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
- * A {@link CoderProvider} may create a {@link Coder} for
- * any concrete class.
+ * A {@link CoderProvider} provides {@link Coder}s.
+ *
+ * <p>It may operate on a parameterized type, such as {@link List}, in which case the
+ * {@link #coderFor} method accepts a list of coders to use for the type parameters.
  */
-public interface CoderProvider {
+public abstract class CoderProvider {
 
   /**
-   * Provides a coder for a given class, if possible.
+   * Returns a {@code Coder<T>} to use for values of a particular type, given the Coders for each of
+   * the type's generic parameter types.
    *
-   * @throws CannotProvideCoderException if no coder can be provided
+   * <p>Throws {@link CannotProvideCoderException} if this {@link CoderProvider} cannot provide
+   * a coder for this type and components.
    */
-  <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException;
+  public abstract <T> Coder<T> coderFor(
+      TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders)
+      throws CannotProvideCoderException;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
new file mode 100644
index 0000000..35d061d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import com.google.auto.service.AutoService;
+import java.util.List;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * {@link Coder} creators have the ability to automatically have their
+ * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry
+ * and a concrete implementation of this interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+@Experimental
+public interface CoderProviderRegistrar {
+  /**
+   * Returns a list of {@link CoderProvider coder providers} which
+   * will be registered by default within each {@link CoderRegistry coder registry} instance.
+   *
+   * <p>See {@link CoderProviders} for convenience methods to construct a {@link CoderProvider}.
+   */
+  List<CoderProvider> getCoderProviders();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
index c072008..414fd8b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
@@ -19,146 +19,178 @@ package org.apache.beam.sdk.coders;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.google.common.base.MoreObjects;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
- * Static utility methods for working with {@link CoderProvider CoderProviders}.
+ * Static utility methods for creating and working with {@link CoderProvider}s.
  */
 public final class CoderProviders {
-
-  // Static utility class
-  private CoderProviders() { }
+  private CoderProviders() { } // Static utility class
 
   /**
-   * Creates a {@link CoderProvider} built from particular static methods of a class that
-   * implements {@link Coder}. The requirements for this method are precisely the requirements
-   * for a {@link Coder} class to be usable with {@link DefaultCoder} annotations.
-   *
-   * <p>The class must have the following static method:
-   *
-   * <pre>{@code
-   * public static Coder<T> of(TypeDescriptor<T> type)
-   * }
-   * </pre>
+   * Creates a {@link CoderProvider} from a class's
+   * {@code static <T> Coder<T> of(TypeDescriptor<T>, List<Coder<?>>}) method.
    */
-  public static <T> CoderProvider fromStaticMethods(Class<T> clazz) {
-    return new CoderProviderFromStaticMethods(clazz);
+  public static CoderProvider fromStaticMethods(Class<?> rawType, Class<?> coderClazz) {
+    checkArgument(
+        Coder.class.isAssignableFrom(coderClazz),
+        "%s is not a subtype of %s",
+        coderClazz.getName(),
+        Coder.class.getSimpleName());
+    return new CoderProviderFromStaticMethods(rawType, coderClazz);
   }
 
-
   /**
-   * Returns a {@link CoderProvider} that consults each of the provider {@code coderProviders}
-   * and returns the first {@link Coder} provided.
-   *
-   * <p>Note that the order in which the providers are listed matters: While the set of types
-   * handled will be the union of those handled by all of the providers in the list, the actual
-   * {@link Coder} provided by the first successful provider may differ, and may have inferior
-   * properties. For example, not all {@link Coder Coders} are deterministic, handle {@code null}
-   * values, or have comparable performance.
+   * Creates a {@link CoderProvider} that always returns the
+   * given coder for the specified type.
    */
-  public static CoderProvider firstOf(CoderProvider... coderProviders) {
-    return new FirstOf(ImmutableList.copyOf(coderProviders));
+  public static CoderProvider forCoder(TypeDescriptor<?> type, Coder<?> coder) {
+    return new CoderProviderForCoder(type, coder);
   }
 
-  ///////////////////////////////////////////////////////////////////////////////////////////////
-
   /**
-   * @see #firstOf
+   * See {@link #fromStaticMethods} for a detailed description
+   * of the characteristics of this {@link CoderProvider}.
    */
-  private static class FirstOf implements CoderProvider {
-
-    private Iterable<CoderProvider> providers;
-
-    public FirstOf(Iterable<CoderProvider> providers) {
-      this.providers = providers;
-    }
+  private static class CoderProviderFromStaticMethods extends CoderProvider {
 
     @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-      List<String> messages = Lists.newArrayList();
-      for (CoderProvider provider : providers) {
-        try {
-          return provider.getCoder(type);
-        } catch (CannotProvideCoderException exc) {
-          messages.add(String.format("%s could not provide a Coder for type %s: %s",
-              provider, type, exc.getMessage()));
-        }
+    public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends Coder<?>> componentCoders)
+        throws CannotProvideCoderException {
+      if (!this.rawType.equals(type.getRawType())) {
+        throw new CannotProvideCoderException(String.format(
+            "Unable to provide coder for %s, this factory can only provide coders for %s",
+            type,
+            this.rawType));
+      }
+      try {
+        return (Coder) factoryMethod.invoke(
+            null /* static */, componentCoders.toArray());
+      } catch (IllegalAccessException
+           | IllegalArgumentException
+           | InvocationTargetException
+           | NullPointerException
+           | ExceptionInInitializerError exn) {
+        throw new IllegalStateException(
+            "error when invoking Coder factory method " + factoryMethod,
+            exn);
       }
-      throw new CannotProvideCoderException(
-          String.format("Cannot provide coder for type %s: %s.",
-              type, Joiner.on("; ").join(messages)));
     }
-  }
 
-  private static class CoderProviderFromStaticMethods implements CoderProvider {
+    ////////////////////////////////////////////////////////////////////////////////
 
-    /** If true, then clazz has {@code of(TypeDescriptor)}. If false, {@code of(Class)}. */
-    private final boolean takesTypeDescriptor;
-    private final Class<?> clazz;
+    // Type raw type used to filter the incoming type on.
+    private final Class<?> rawType;
 
-    public CoderProviderFromStaticMethods(Class<?> clazz) {
-      // Note that the second condition supports older classes, which only needed to provide
-      // of(Class), not of(TypeDescriptor). Our own classes have updated to accept a
-      // TypeDescriptor. Hence the error message points only to the current specification,
-      // not both acceptable conditions.
-      checkArgument(classTakesTypeDescriptor(clazz) || classTakesClass(clazz),
-          "Class " + clazz.getCanonicalName()
-          + " is missing required static method of(TypeDescriptor).");
+    // Method to create a coder given component coders
+    // For a Coder class of kind * -> * -> ... n times ... -> *
+    // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
+    private final Method factoryMethod;
 
-      this.takesTypeDescriptor = classTakesTypeDescriptor(clazz);
-      this.clazz = clazz;
+    /**
+     * Returns a CoderProvider that invokes the given static factory method
+     * to create the Coder.
+     */
+    private CoderProviderFromStaticMethods(Class<?> rawType, Class<?> coderClazz) {
+      this.rawType = rawType;
+      this.factoryMethod = getFactoryMethod(coderClazz);
     }
 
-    @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+    /**
+     * Returns the static {@code of} constructor method on {@code coderClazz}
+     * if it exists. It is assumed to have one {@link Coder} parameter for
+     * each type parameter of {@code coderClazz}.
+     */
+    private Method getFactoryMethod(Class<?> coderClazz) {
+      Method factoryMethodCandidate;
+
+      // Find the static factory method of coderClazz named 'of' with
+      // the appropriate number of type parameters.
+      int numTypeParameters = coderClazz.getTypeParameters().length;
+      Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
+      Arrays.fill(factoryMethodArgTypes, Coder.class);
       try {
-        if (takesTypeDescriptor) {
-          @SuppressWarnings("unchecked")
-          Coder<T> result = InstanceBuilder.ofType(Coder.class)
-              .fromClass(clazz)
-              .fromFactoryMethod("of")
-              .withArg(TypeDescriptor.class, type)
-              .build();
-          return result;
-        } else {
-          @SuppressWarnings("unchecked")
-          Coder<T> result = InstanceBuilder.ofType(Coder.class)
-              .fromClass(clazz)
-              .fromFactoryMethod("of")
-              .withArg(Class.class, type.getRawType())
-              .build();
-          return result;
-        }
-      } catch (RuntimeException exc) {
-        if (exc.getCause() instanceof InvocationTargetException) {
-          throw new CannotProvideCoderException(exc.getCause().getCause());
+        factoryMethodCandidate =
+            coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
+      } catch (NoSuchMethodException | SecurityException exn) {
+        throw new IllegalArgumentException(
+            "cannot register Coder " + coderClazz + ": "
+            + "does not have an accessible method named 'of' with "
+            + numTypeParameters + " arguments of Coder type",
+            exn);
+      }
+      if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
+        throw new IllegalArgumentException(
+            "cannot register Coder " + coderClazz + ": "
+            + "method named 'of' with " + numTypeParameters
+            + " arguments of Coder type is not static");
+      }
+      if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
+        throw new IllegalArgumentException(
+            "cannot register Coder " + coderClazz + ": "
+            + "method named 'of' with " + numTypeParameters
+            + " arguments of Coder type does not return a " + coderClazz);
+      }
+      try {
+        if (!factoryMethodCandidate.isAccessible()) {
+          factoryMethodCandidate.setAccessible(true);
         }
-        throw exc;
+      } catch (SecurityException exn) {
+        throw new IllegalArgumentException(
+            "cannot register Coder " + coderClazz + ": "
+            + "method named 'of' with " + numTypeParameters
+            + " arguments of Coder type is not accessible",
+            exn);
       }
+
+      return factoryMethodCandidate;
     }
 
-    private boolean classTakesTypeDescriptor(Class<?> clazz) {
-      try {
-        clazz.getDeclaredMethod("of", TypeDescriptor.class);
-        return true;
-      } catch (NoSuchMethodException | SecurityException exc) {
-        return false;
-      }
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("rawType", rawType)
+          .add("factoryMethod", factoryMethod)
+          .toString();
     }
+  }
 
-    private boolean classTakesClass(Class<?> clazz) {
-      try {
-        clazz.getDeclaredMethod("of", Class.class);
-        return true;
-      } catch (NoSuchMethodException | SecurityException exc) {
-        return false;
+  /**
+   * See {@link #forCoder} for a detailed description of this {@link CoderProvider}.
+   */
+  private static class CoderProviderForCoder extends CoderProvider {
+    private final Coder<?> coder;
+    private final TypeDescriptor<?> type;
+
+    public CoderProviderForCoder(TypeDescriptor<?> type, Coder<?> coder){
+      this.type = type;
+      this.coder = coder;
+    }
+
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends Coder<?>> componentCoders)
+        throws CannotProvideCoderException {
+      if (!this.type.equals(type)) {
+        throw new CannotProvideCoderException(String.format(
+            "Unable to provide coder for %s, this factory can only provide coders for %s",
+            type,
+            this.type));
       }
+      return (Coder) coder;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("type", type)
+          .add("coder", coder)
+          .toString();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
deleted file mode 100644
index fced976..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * {@link Coder} creators have the ability to automatically have their
- * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry
- * and a concrete implementation of this interface.
- *
- * <p>It is optional but recommended to use one of the many build time tools such as
- * {@link AutoService} to generate the necessary META-INF files automatically.
- */
-@Experimental
-public interface CoderRegistrar {
-  /**
-   * Returns a mapping of {@link Class classes} to {@link CoderFactory coder factories} which
-   * will be registered by default within each {@link CoderRegistry coder registry} instance.
-   *
-   * <p>See {@link CoderFactories} for convenience methods to construct a {@link CoderFactory}.
-   *
-   * <p>Note that a warning is logged if multiple {@link CoderRegistrar coder registrars} provide
-   * mappings for the same {@link Class}.
-   */
-  Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses();
-}


[2/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types

Posted by lc...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 7f0f632..2ba548a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -20,12 +20,10 @@ package org.apache.beam.sdk.coders;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
@@ -37,12 +35,13 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -57,102 +56,118 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A {@link CoderRegistry} allows registering the default {@link Coder} to use for a Java class,
- * and looking up and instantiating the default {@link Coder} for a Java type.
+ * A {@link CoderRegistry} allows creating a {@link Coder} for a given Java {@link Class class} or
+ * {@link TypeDescriptor type descriptor}.
  *
- * <p>{@link CoderRegistry} uses the following mechanisms to determine a default {@link Coder} for a
- * Java class, in order of precedence:
- * <ol>
- *   <li>Registration:
- *     <ul>
- *       <li>A {@link CoderFactory} can be registered to handle a particular class via
- *           {@link #registerCoder(Class, CoderFactory)}.</li>
- *       <li>A {@link Coder} class with the static methods to satisfy
- *           {@link CoderFactories#fromStaticMethods} can be registered via
- *           {@link #registerCoder(Class, Class)}.</li>
- *       <li>Types can be automatically registered via {@link CoderRegistrar coder registrars}.</li>
- *     </ul>
- *   <li>Annotations: {@link DefaultCoder} can be used to annotate a type with
- *       the default {@code Coder} type. The {@link Coder} class must satisfy the requirements
- *       of {@link CoderProviders#fromStaticMethods}.
- *   <li>Fallback: A fallback {@link CoderProvider} is used to attempt to provide a {@link Coder}
- *       for any type. By default, there is {@link SerializableCoder#PROVIDER}, which can provide a
- *       {@link Coder} for any type that is serializable via Java serialization. The fallback
- *       {@link CoderProvider} can be get and set respectively using
- *       {@link #getFallbackCoderProvider()} and {@link #setFallbackCoderProvider}. Multiple
- *       fallbacks can be chained together using {@link CoderProviders#firstOf}.
- * </ol>
+ * <p>Creation of the {@link Coder} is delegated to one of the many registered
+ * {@link CoderProvider coder providers} based upon the registration order.
+ *
+ * <p>By default, the {@link CoderProvider coder provider} precedence order is as follows:
+ * <ul>
+ *   <li>Coder providers registered programmatically with
+ *       {@link CoderRegistry#registerCoderProvider(CoderProvider)}.</li>
+ *   <li>A default coder provider for common Java (Byte, Double, List, ...) and
+ *       Apache Beam (KV, ...) types.</li>
+ *   <li>Coder providers registered automatically through a {@link CoderProviderRegistrar} using
+ *       a {@link ServiceLoader}. Note that the {@link ServiceLoader} registration order is
+ *       consistent but may change due to the addition or removal of libraries exposed
+ *       to the application. This can impact the coder returned if multiple coder providers
+ *       are capable of supplying a coder for the specified type.</li>
+ * </ul>
+ *
+ * <p>Note that if multiple {@link CoderProvider coder providers} can provide a {@link Coder} for
+ * a given type, the precedence order above defines which {@link CoderProvider} is chosen.
  */
-public class CoderRegistry implements CoderProvider {
+public class CoderRegistry {
 
   private static final Logger LOG = LoggerFactory.getLogger(CoderRegistry.class);
-  private static final Map<Class<?>, CoderFactory> REGISTERED_CODER_FACTORIES_PER_CLASS;
+  private static final List<CoderProvider> REGISTERED_CODER_FACTORIES;
+
+  /** A {@link CoderProvider} for common Java SDK and Apache Beam SDK types. */
+  private static class CommonTypes extends CoderProvider {
+    private final Map<Class<?>, CoderProvider> commonTypesToCoderProviders;
+
+    private CommonTypes() {
+      ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder();
+      builder.put(Byte.class,
+          CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class));
+      builder.put(BitSet.class,
+          CoderProviders.fromStaticMethods(BitSet.class, BitSetCoder.class));
+      builder.put(Double.class,
+          CoderProviders.fromStaticMethods(Double.class, DoubleCoder.class));
+      builder.put(Instant.class,
+          CoderProviders.fromStaticMethods(Instant.class, InstantCoder.class));
+      builder.put(Integer.class,
+          CoderProviders.fromStaticMethods(Integer.class, VarIntCoder.class));
+      builder.put(Iterable.class,
+          CoderProviders.fromStaticMethods(Iterable.class, IterableCoder.class));
+      builder.put(KV.class,
+          CoderProviders.fromStaticMethods(KV.class, KvCoder.class));
+      builder.put(List.class,
+          CoderProviders.fromStaticMethods(List.class, ListCoder.class));
+      builder.put(Long.class,
+          CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class));
+      builder.put(Map.class,
+          CoderProviders.fromStaticMethods(Map.class, MapCoder.class));
+      builder.put(Set.class,
+          CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
+      builder.put(String.class,
+          CoderProviders.fromStaticMethods(String.class, StringUtf8Coder.class));
+      builder.put(TimestampedValue.class,
+          CoderProviders.fromStaticMethods(
+              TimestampedValue.class, TimestampedValue.TimestampedValueCoder.class));
+      builder.put(Void.class,
+          CoderProviders.fromStaticMethods(Void.class, VoidCoder.class));
+      builder.put(byte[].class,
+          CoderProviders.fromStaticMethods(byte[].class, ByteArrayCoder.class));
+      builder.put(IntervalWindow.class,
+          CoderProviders.forCoder(
+              TypeDescriptor.of(IntervalWindow.class), IntervalWindow.getCoder()));
+      commonTypesToCoderProviders = builder.build();
+    }
+
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      CoderProvider factory = commonTypesToCoderProviders.get(typeDescriptor.getRawType());
+      if (factory == null) {
+        throw new CannotProvideCoderException(
+            String.format("%s is not one of the common types.", typeDescriptor));
+      }
+      return factory.coderFor(typeDescriptor, componentCoders);
+    }
+  }
 
   static {
-    // Register the standard coders first so they are chosen as the default
-    Multimap<Class<?>, CoderFactory> codersToRegister = HashMultimap.create();
-    codersToRegister.put(Byte.class, CoderFactories.fromStaticMethods(ByteCoder.class));
-    codersToRegister.put(BitSet.class, CoderFactories.fromStaticMethods(BitSetCoder.class));
-    codersToRegister.put(Double.class, CoderFactories.fromStaticMethods(DoubleCoder.class));
-    codersToRegister.put(Instant.class, CoderFactories.fromStaticMethods(InstantCoder.class));
-    codersToRegister.put(Integer.class, CoderFactories.fromStaticMethods(VarIntCoder.class));
-    codersToRegister.put(Iterable.class, CoderFactories.fromStaticMethods(IterableCoder.class));
-    codersToRegister.put(KV.class, CoderFactories.fromStaticMethods(KvCoder.class));
-    codersToRegister.put(List.class, CoderFactories.fromStaticMethods(ListCoder.class));
-    codersToRegister.put(Long.class, CoderFactories.fromStaticMethods(VarLongCoder.class));
-    codersToRegister.put(Map.class, CoderFactories.fromStaticMethods(MapCoder.class));
-    codersToRegister.put(Set.class, CoderFactories.fromStaticMethods(SetCoder.class));
-    codersToRegister.put(String.class, CoderFactories.fromStaticMethods(StringUtf8Coder.class));
-    codersToRegister.put(TimestampedValue.class,
-        CoderFactories.fromStaticMethods(TimestampedValue.TimestampedValueCoder.class));
-    codersToRegister.put(Void.class, CoderFactories.fromStaticMethods(VoidCoder.class));
-    codersToRegister.put(byte[].class, CoderFactories.fromStaticMethods(ByteArrayCoder.class));
-    codersToRegister.put(IntervalWindow.class, CoderFactories.forCoder(IntervalWindow.getCoder()));
+    // Register the standard coders first so they are chosen over ServiceLoader ones
+    List<CoderProvider> codersToRegister = new ArrayList<>();
+    codersToRegister.add(new CommonTypes());
 
     // Enumerate all the CoderRegistrars in a deterministic order, adding all coders to register
-    Set<CoderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+    Set<CoderProviderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
     registrars.addAll(Lists.newArrayList(
-        ServiceLoader.load(CoderRegistrar.class, ReflectHelpers.findClassLoader())));
-    for (CoderRegistrar registrar : registrars) {
-      for (Map.Entry<Class<?>, CoderFactory> entry
-          : registrar.getCoderFactoriesToUseForClasses().entrySet()) {
-        codersToRegister.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    // Warn the user if multiple coders want to be registered for the same class
-    Map<Class<?>, Collection<CoderFactory>> multipleRegistrations =
-        Maps.filterValues(codersToRegister.asMap(), new Predicate<Collection<CoderFactory>>() {
-      @Override
-      public boolean apply(@Nonnull Collection<CoderFactory> input) {
-        return input.size() > 1;
-      }
-    });
-    for (Map.Entry<Class<?>, Collection<CoderFactory>> entry : multipleRegistrations.entrySet()) {
-      LOG.warn("Multiple CoderFactory registrations {} found for class {}, using {}.",
-          entry.getKey(), entry.getValue(), entry.getValue().iterator().next());
+        ServiceLoader.load(CoderProviderRegistrar.class, ReflectHelpers.findClassLoader())));
+    for (CoderProviderRegistrar registrar : registrars) {
+        codersToRegister.addAll(registrar.getCoderProviders());
     }
 
-    // Build a map choosing the first coder within the multimap as the default
-    ImmutableMap.Builder<Class<?>, CoderFactory> registeredCoderFactoriesPerClassBuilder =
-        ImmutableMap.builder();
-    for (Map.Entry<Class<?>, Collection<CoderFactory>> entry
-        : codersToRegister.asMap().entrySet()) {
-      registeredCoderFactoriesPerClassBuilder.put(
-          entry.getKey(), entry.getValue().iterator().next());
-    }
-    REGISTERED_CODER_FACTORIES_PER_CLASS = registeredCoderFactoriesPerClassBuilder.build();
+    REGISTERED_CODER_FACTORIES = ImmutableList.copyOf(codersToRegister);
   }
 
   /**
    * Creates a CoderRegistry containing registrations for all standard coders part of the core Java
-   * Apache Beam SDK and also any registrations provided by {@link CoderRegistrar coder registrars}.
+   * Apache Beam SDK and also any registrations provided by
+   * {@link CoderProviderRegistrar coder registrars}.
    *
-   * <p>Multiple registrations for the same class result in the (in order of precedence):
+   * <p>Multiple registrations which can produce a coder for a given type result in a Coder created
+   * by the (in order of precedence):
    * <ul>
-   *   <li>Standard coder part of the core Apache Beam Java SDK being used.</li>
-   *   <li>The coder from the {@link CoderRegistrar} with the lexicographically smallest
-   *   {@link Class#getName() class name} being used.</li>
+   *   <li>{@link CoderProvider coder providers} registered programmatically through
+   *   {@link CoderRegistry#registerCoderProvider}.</li>
+   *   <li>{@link CoderProvider coder providers} for core types found within the Apache Beam Java
+   *   SDK being used.</li>
+   *   <li>The {@link CoderProvider coder providers} from the {@link CoderProviderRegistrar}
+   *   with the lexicographically smallest {@link Class#getName() class name} being used.</li>
    * </ul>
    */
   public static CoderRegistry createDefault() {
@@ -160,97 +175,69 @@ public class CoderRegistry implements CoderProvider {
   }
 
   private CoderRegistry() {
-    coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS);
-    setFallbackCoderProvider(
-        CoderProviders.firstOf(SerializableCoder.PROVIDER));
+    coderProviders = new LinkedList<>(REGISTERED_CODER_FACTORIES);
   }
 
   /**
-   * Registers {@code coderClazz} as the default {@link Coder} class to handle encoding and
-   * decoding instances of {@code clazz}, overriding prior registrations if any exist.
-   *
-   * <p>Supposing {@code T} is the static type corresponding to the {@code clazz}, then
-   * {@code coderClazz} should have a static factory method with the following signature:
-   *
-   * <pre> {@code
-   * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
-   * } </pre>
+   * Registers {@code coderProvider} as a potential {@link CoderProvider} which can produce
+   * {@code Coder} instances.
    *
-   * <p>This method will be called to create instances of {@code Coder<T>} for values of type
-   * {@code T}, passing Coders for each of the generic type parameters of {@code T}.  If {@code T}
-   * takes no generic type parameters, then the {@code of()} factory method should have no
-   * arguments.
+   * <p>This method prioritizes this {@link CoderProvider} over all prior registered coders.
    *
-   * <p>If {@code T} is a parameterized type, then it should additionally have a method with the
-   * following signature:
-   *
-   * <pre> {@code
-   * public static List<Object> getInstanceComponents(T exampleValue);
-   * } </pre>
-   *
-   * <p>This method will be called to decompose a value during the {@link Coder} inference process,
-   * to automatically choose {@link Coder Coders} for the components.
-   *
-   * @param clazz the class of objects to be encoded
-   * @param coderClazz a class with static factory methods to provide {@link Coder Coders}
+   * <p>See {@link CoderProviders} for common {@link CoderProvider} patterns.
    */
-  public void registerCoder(Class<?> clazz, Class<?> coderClazz) {
-    registerCoder(clazz, CoderFactories.fromStaticMethods(coderClazz));
+  public void registerCoderProvider(CoderProvider coderProvider) {
+    coderProviders.addFirst(coderProvider);
   }
 
   /**
-   * Registers {@code coderFactory} as the default {@link CoderFactory} to produce {@code Coder}
-   * instances to decode and encode instances of {@code clazz}. This will override prior
-   * registrations if any exist.
+   * Registers the provided {@link Coder} for the given class.
+   *
+   * <p>Note that this is equivalent to {@code registerCoderForType(TypeDescriptor.of(clazz))}. See
+   * {@link #registerCoderForType(TypeDescriptor, Coder)} for further details.
    */
-  public void registerCoder(Class<?> clazz, CoderFactory coderFactory) {
-    coderFactoryMap.put(clazz, coderFactory);
+  public void registerCoderForClass(Class<?> clazz, Coder<?> coder) {
+    registerCoderForType(TypeDescriptor.of(clazz), coder);
   }
 
   /**
-   * Register the provided {@link Coder} for encoding all values of the specified {@code Class}.
-   * This will override prior registrations if any exist.
+   * Registers the provided {@link Coder} for the given type.
    *
-   * <p>Not for use with generic rawtypes. Instead, register a {@link CoderFactory} via
-   * {@link #registerCoder(Class, CoderFactory)} or ensure your {@code Coder} class has the
-   * appropriate static methods and register it directly via {@link #registerCoder(Class, Class)}.
+   * <p>Note that this is equivalent to
+   * {@code registerCoderProvider(CoderProviders.forCoder(type, coder))}. See
+   * {@link #registerCoderProvider} and {@link CoderProviders#forCoder} for further details.
    */
-  public <T> void registerCoder(Class<T> rawClazz, Coder<T> coder) {
-    checkArgument(
-        rawClazz.getTypeParameters().length == 0,
-        "CoderRegistry.registerCoder(Class<T>, Coder<T>) may not be used "
-            + "with unspecialized generic classes");
-
-    CoderFactory factory = CoderFactories.forCoder(coder);
-    registerCoder(rawClazz, factory);
+  public void registerCoderForType(TypeDescriptor<?> type, Coder<?> coder) {
+    registerCoderProvider(CoderProviders.forCoder(type, coder));
   }
 
   /**
-   * Returns the {@link Coder} to use by default for values of the given type.
-   *
-   * @throws CannotProvideCoderException if there is no default Coder.
+   * Returns the {@link Coder} to use for values of the given class.
+
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  public <T> Coder<T> getDefaultCoder(TypeDescriptor<T> typeDescriptor)
-      throws CannotProvideCoderException {
-    return getDefaultCoder(typeDescriptor, Collections.<Type, Coder<?>>emptyMap());
+  public <T> Coder<T> getCoder(Class<T> clazz) throws CannotProvideCoderException {
+    return getCoder(TypeDescriptor.of(clazz));
   }
 
   /**
-   * See {@link #getDefaultCoder(TypeDescriptor)}.
+   * Returns the {@link Coder} to use for values of the given type.
+   *
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  @Override
-  public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
-      throws CannotProvideCoderException {
-    return getDefaultCoder(typeDescriptor);
+  public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+    return getCoderFromTypeDescriptor(type, Collections.<Type, Coder<?>>emptyMap());
   }
 
   /**
-   * Returns the {@link Coder} to use by default for values of the given type, where the given input
+   * Returns the {@link Coder} for values of the given type, where the given input
    * type uses the given {@link Coder}.
    *
-   * @throws CannotProvideCoderException if there is no default Coder.
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  public <InputT, OutputT> Coder<OutputT> getDefaultCoder(
+  @Deprecated
+  @Internal
+  public <InputT, OutputT> Coder<OutputT> getCoder(
       TypeDescriptor<OutputT> typeDescriptor,
       TypeDescriptor<InputT> inputTypeDescriptor,
       Coder<InputT> inputCoder)
@@ -258,22 +245,26 @@ public class CoderRegistry implements CoderProvider {
     checkArgument(typeDescriptor != null);
     checkArgument(inputTypeDescriptor != null);
     checkArgument(inputCoder != null);
-    return getDefaultCoder(
+    return getCoderFromTypeDescriptor(
         typeDescriptor, getTypeToCoderBindings(inputTypeDescriptor.getType(), inputCoder));
   }
 
   /**
    * Returns the {@link Coder} to use on elements produced by this function, given the {@link Coder}
    * used for its input elements.
+   *
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  public <InputT, OutputT> Coder<OutputT> getDefaultOutputCoder(
+  @Deprecated
+  @Internal
+  public <InputT, OutputT> Coder<OutputT> getOutputCoder(
       SerializableFunction<InputT, OutputT> fn, Coder<InputT> inputCoder)
       throws CannotProvideCoderException {
 
     ParameterizedType fnType = (ParameterizedType)
         TypeDescriptor.of(fn.getClass()).getSupertype(SerializableFunction.class).getType();
 
-    return getDefaultCoder(
+    return getCoder(
         fn.getClass(),
         SerializableFunction.class,
         ImmutableMap.of(fnType.getActualTypeArguments()[0], inputCoder),
@@ -284,9 +275,11 @@ public class CoderRegistry implements CoderProvider {
    * Returns the {@link Coder} to use for the specified type parameter specialization of the
    * subclass, given {@link Coder Coders} to use for all other type parameters (if any).
    *
-   * @throws CannotProvideCoderException if there is no default Coder.
+   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
    */
-  public <T, OutputT> Coder<OutputT> getDefaultCoder(
+  @Deprecated
+  @Internal
+  public <T, OutputT> Coder<OutputT> getCoder(
       Class<? extends T> subClass,
       Class<T> baseClass,
       Map<Type, ? extends Coder<?>> knownCoders,
@@ -305,147 +298,11 @@ public class CoderRegistry implements CoderProvider {
     }
   }
 
-  /**
-   * Returns the {@link Coder} to use for the provided example value, if it can be determined.
-   *
-   * @throws CannotProvideCoderException if there is no default {@link Coder} or
-   * more than one {@link Coder} matches
-   */
-  public <T> Coder<T> getDefaultCoder(T exampleValue) throws CannotProvideCoderException {
-    Class<?> clazz = exampleValue == null ? Void.class : exampleValue.getClass();
-
-    if (clazz.getTypeParameters().length == 0) {
-      // Trust that getDefaultCoder returns a valid
-      // Coder<T> for non-generic clazz.
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) getDefaultCoder(clazz);
-      return coder;
-    } else {
-      CoderFactory factory = getDefaultCoderFactory(clazz);
-
-      List<Object> components = factory.getInstanceComponents(exampleValue);
-      if (components == null) {
-        throw new CannotProvideCoderException(String.format(
-            "Cannot provide coder based on value with class %s: The registered CoderFactory with "
-            + "class %s failed to decompose the value, which is required in order to provide "
-            + "Coders for the components.",
-            clazz.getCanonicalName(), factory.getClass().getCanonicalName()));
-      }
-
-      // componentcoders = components.map(this.getDefaultCoder)
-      List<Coder<?>> componentCoders = new ArrayList<>();
-      for (Object component : components) {
-        try {
-          Coder<?> componentCoder = getDefaultCoder(component);
-          componentCoders.add(componentCoder);
-        } catch (CannotProvideCoderException exc) {
-          throw new CannotProvideCoderException(
-              String.format("Cannot provide coder based on value with class %s",
-                  clazz.getCanonicalName()),
-              exc);
-        }
-      }
-
-      // Trust that factory.create maps from valid component Coders
-      // to a valid Coder<T>.
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) factory.create(componentCoders);
-      return coder;
-    }
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given class. The following three
-   * sources for a {@link Coder} will be attempted, in order:
-   *
-   * <ol>
-   *   <li>A {@link Coder} class registered explicitly via a call to {@link #registerCoder},
-   *   <li>A {@link DefaultCoder} annotation on the class,
-   *   <li>This registry's fallback {@link CoderProvider}, which may be able to generate a
-   *   {@link Coder} for an arbitrary class.
-   * </ol>
-   *
-   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
-   */
-  public <T> Coder<T> getDefaultCoder(Class<T> clazz) throws CannotProvideCoderException {
-
-    CannotProvideCoderException factoryException;
-    try {
-      CoderFactory coderFactory = getDefaultCoderFactory(clazz);
-      LOG.debug("Default coder for {} found by factory", clazz);
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) coderFactory.create(Collections.<Coder<?>>emptyList());
-      return coder;
-    } catch (CannotProvideCoderException exc) {
-      factoryException = exc;
-    }
-
-    CannotProvideCoderException annotationException;
-    try {
-      return getDefaultCoderFromAnnotation(clazz);
-    } catch (CannotProvideCoderException exc) {
-      annotationException = exc;
-    }
-
-    CannotProvideCoderException fallbackException;
-    if (getFallbackCoderProvider() != null) {
-      try {
-        return getFallbackCoderProvider().getCoder(TypeDescriptor.<T>of(clazz));
-      } catch (CannotProvideCoderException exc) {
-        fallbackException = exc;
-      }
-    } else {
-      fallbackException = new CannotProvideCoderException("no fallback CoderProvider configured");
-    }
-
-    // Build up the error message and list of causes.
-    StringBuilder messageBuilder = new StringBuilder()
-        .append("Unable to provide a default Coder for ").append(clazz.getCanonicalName())
-        .append(". Correct one of the following root causes:");
-
-    messageBuilder
-        .append("\n  Building a Coder using a registered CoderFactory failed: ")
-        .append(factoryException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the @DefaultCoder annotation failed: ")
-        .append(annotationException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the fallback CoderProvider failed: ")
-        .append(fallbackException.getMessage());
-
-    throw new CannotProvideCoderException(messageBuilder.toString());
-  }
-
-  /**
-   * Sets the fallback {@link CoderProvider} for this registry. If no other method succeeds in
-   * providing a {@code Coder<T>} for a type {@code T}, then the registry will attempt to create
-   * a {@link Coder} using this {@link CoderProvider}.
-   *
-   * <p>By default, this is set to {@link SerializableCoder#PROVIDER}.
-   *
-   * <p>See {@link #getFallbackCoderProvider}.
-   */
-  public void setFallbackCoderProvider(CoderProvider coderProvider) {
-    fallbackCoderProvider = coderProvider;
-  }
-
-  /**
-   * Returns the fallback {@link CoderProvider} for this registry.
-   *
-   * <p>See {@link #setFallbackCoderProvider}.
-   */
-  public CoderProvider getFallbackCoderProvider() {
-    return fallbackCoderProvider;
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**
    * Returns a {@code Map} from each of {@code baseClass}'s type parameters to the {@link Coder} to
-   * use by default for it, in the context of {@code subClass}'s specialization of
-   * {@code baseClass}.
+   * use for it, in the context of {@code subClass}'s specialization of {@code baseClass}.
    *
    * <p>If no {@link Coder} can be inferred for a particular type parameter, then that type variable
    * will be absent from the returned {@code Map}.
@@ -465,7 +322,7 @@ public class CoderRegistry implements CoderProvider {
    * <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a
    * {@link Coder} for a particular type variable cannot be inferred, but merely omits the entry
    * from the returned {@code Map}. It is the responsibility of the caller (usually
-   * {@link #getDefaultCoder} to extract the desired coder or throw a
+   * {@link #getCoderFromTypeDescriptor} to extract the desired coder or throw a
    * {@link CannotProvideCoderException} when appropriate.
    *
    * @param subClass the concrete type whose specializations are being inferred
@@ -495,8 +352,7 @@ public class CoderRegistry implements CoderProvider {
 
   /**
    * Returns an array listing, for each of {@code baseClass}'s type parameters, the {@link Coder} to
-   * use by default for it, in the context of {@code subClass}'s specialization of
-   * {@code baseClass}.
+   * use for it, in the context of {@code subClass}'s specialization of {@code baseClass}.
    *
    * <p>If a {@link Coder} cannot be inferred for a type variable, its slot in the resulting array
    * will be {@code null}.
@@ -518,7 +374,7 @@ public class CoderRegistry implements CoderProvider {
    * <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a
    * {@link Coder} for a particular type variable cannot be inferred. Instead, it results in a
    * {@code null} in the array. It is the responsibility of the caller (usually
-   * {@link #getDefaultCoder} to extract the desired coder or throw a
+   * {@link #getCoderFromTypeDescriptor} to extract the desired coder or throw a
    * {@link CannotProvideCoderException} when appropriate.
    *
    * @param subClass the concrete type whose specializations are being inferred
@@ -571,7 +427,7 @@ public class CoderRegistry implements CoderProvider {
         result[i] = knownCoders[i];
       } else {
         try {
-          result[i] = getDefaultCoder(typeArgs[i], context);
+          result[i] = getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgs[i]), context);
         } catch (CannotProvideCoderException exc) {
           result[i] = null;
         }
@@ -695,89 +551,31 @@ public class CoderRegistry implements CoderProvider {
   }
 
   /**
-   * The map of classes to the CoderFactories to use to create their
-   * default Coders.
+   * The list of {@link CoderProvider coder providers} to use to provide Coders.
    */
-  private Map<Class<?>, CoderFactory> coderFactoryMap;
-
-  /**
-   * A provider of coders for types where no coder is registered.
-   */
-  private CoderProvider fallbackCoderProvider;
-
-  /**
-   * Returns the {@link CoderFactory} to use to create default {@link Coder Coders} for instances of
-   * the given class, or {@code null} if there is no default {@link CoderFactory} registered.
-   */
-  private CoderFactory getDefaultCoderFactory(Class<?> clazz) throws CannotProvideCoderException {
-    CoderFactory coderFactoryOrNull = coderFactoryMap.get(clazz);
-    if (coderFactoryOrNull != null) {
-      return coderFactoryOrNull;
-    } else {
-      throw new CannotProvideCoderException(
-          String.format("Cannot provide coder based on value with class %s: No CoderFactory has "
-              + "been registered for the class.", clazz.getCanonicalName()));
-    }
-  }
-
-  /**
-   * Returns the {@link Coder} returned according to the {@link CoderProvider} from any
-   * {@link DefaultCoder} annotation on the given class.
-   */
-  private <T> Coder<T> getDefaultCoderFromAnnotation(Class<T> clazz)
-      throws CannotProvideCoderException {
-    DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
-    if (defaultAnnotation == null) {
-      throw new CannotProvideCoderException(
-          String.format("Class %s does not have a @DefaultCoder annotation.",
-              clazz.getCanonicalName()));
-    }
-
-    LOG.debug("DefaultCoder annotation found for {} with value {}",
-        clazz, defaultAnnotation.value());
-    CoderProvider coderProvider = CoderProviders.fromStaticMethods(defaultAnnotation.value());
-    return coderProvider.getCoder(TypeDescriptor.of(clazz));
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given type,
-   * in a context where the given types use the given coders.
-   *
-   * @throws CannotProvideCoderException if a coder cannot be provided
-   */
-  private <T> Coder<T> getDefaultCoder(
-      TypeDescriptor<T> typeDescriptor,
-      Map<Type, Coder<?>> typeCoderBindings)
-      throws CannotProvideCoderException {
-
-    Coder<?> defaultCoder = getDefaultCoder(typeDescriptor.getType(), typeCoderBindings);
-    LOG.debug("Default coder for {}: {}", typeDescriptor, defaultCoder);
-    @SuppressWarnings("unchecked")
-    Coder<T> result = (Coder<T>) defaultCoder;
-    return result;
-  }
+  private LinkedList<CoderProvider> coderProviders;
 
   /**
-   * Returns the {@link Coder} to use by default for values of the given type,
+   * Returns a {@link Coder} to use for values of the given type,
    * in a context where the given types use the given coders.
    *
    * @throws CannotProvideCoderException if a coder cannot be provided
    */
-  private Coder<?> getDefaultCoder(Type type, Map<Type, Coder<?>> typeCoderBindings)
+  private <T> Coder<T> getCoderFromTypeDescriptor(
+      TypeDescriptor<T> typeDescriptor, Map<Type, Coder<?>> typeCoderBindings)
       throws CannotProvideCoderException {
-    Coder<?> coder = typeCoderBindings.get(type);
-    if (coder != null) {
-      return coder;
-    }
-    if (type instanceof Class<?>) {
-      Class<?> clazz = (Class<?>) type;
-      return getDefaultCoder(clazz);
+    Type type = typeDescriptor.getType();
+    Coder<?> coder;
+    if (typeCoderBindings.containsKey(type)) {
+      coder = typeCoderBindings.get(type);
+    } else if (type instanceof Class<?>) {
+      coder = getCoderFromFactories(typeDescriptor, Collections.<Coder<?>>emptyList());
     } else if (type instanceof ParameterizedType) {
-      return getDefaultCoder((ParameterizedType) type, typeCoderBindings);
+      coder = getCoderFromParameterizedType((ParameterizedType) type, typeCoderBindings);
     } else if (type instanceof TypeVariable) {
-      return getDefaultCoder(TypeDescriptor.of(type).getRawType());
+      coder = getCoderFromFactories(typeDescriptor, Collections.<Coder<?>>emptyList());
     } else if (type instanceof WildcardType) {
-      // No default coder for an unknown generic type.
+      // No coder for an unknown generic type.
       throw new CannotProvideCoderException(
           String.format("Cannot provide a coder for type variable %s"
           + " (declared by %s) because the actual type is unknown due to erasure.",
@@ -788,72 +586,70 @@ public class CoderRegistry implements CoderProvider {
       throw new RuntimeException(
           "Internal error: unexpected kind of Type: " + type);
     }
+
+    LOG.debug("Coder for {}: {}", typeDescriptor, coder);
+    @SuppressWarnings("unchecked")
+    Coder<T> result = (Coder<T>) coder;
+    return result;
   }
 
   /**
-   * Returns the {@link Coder} to use by default for values of the given
+   * Returns a {@link Coder} to use for values of the given
    * parameterized type, in a context where the given types use the
    * given {@link Coder Coders}.
    *
    * @throws CannotProvideCoderException if no coder can be provided
    */
-  private Coder<?> getDefaultCoder(
+  private Coder<?> getCoderFromParameterizedType(
       ParameterizedType type,
       Map<Type, Coder<?>> typeCoderBindings)
           throws CannotProvideCoderException {
 
-    CannotProvideCoderException factoryException;
-    try {
-      return getDefaultCoderFromFactory(type, typeCoderBindings);
-    } catch (CannotProvideCoderException exc) {
-      factoryException = exc;
-    }
-
-    CannotProvideCoderException annotationException;
-    try {
-      Class<?> rawClazz = (Class<?>) type.getRawType();
-      return getDefaultCoderFromAnnotation(rawClazz);
-    } catch (CannotProvideCoderException exc) {
-      annotationException = exc;
-    }
-
-    // Build up the error message and list of causes.
-    StringBuilder messageBuilder = new StringBuilder()
-        .append("Unable to provide a default Coder for ").append(type)
-        .append(". Correct one of the following root causes:");
-
-    messageBuilder
-        .append("\n  Building a Coder using a registered CoderFactory failed: ")
-        .append(factoryException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the @DefaultCoder annotation failed: ")
-        .append(annotationException.getMessage());
-
-    throw new CannotProvideCoderException(messageBuilder.toString());
-  }
-
-  private Coder<?> getDefaultCoderFromFactory(
-      ParameterizedType type,
-      Map<Type, Coder<?>> typeCoderBindings)
-          throws CannotProvideCoderException {
-    Class<?> rawClazz = (Class<?>) type.getRawType();
-    CoderFactory coderFactory = getDefaultCoderFactory(rawClazz);
     List<Coder<?>> typeArgumentCoders = new ArrayList<>();
     for (Type typeArgument : type.getActualTypeArguments()) {
       try {
-        Coder<?> typeArgumentCoder = getDefaultCoder(typeArgument,
-                                                     typeCoderBindings);
+        Coder<?> typeArgumentCoder =
+            getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
         typeArgumentCoders.add(typeArgumentCoder);
       } catch (CannotProvideCoderException exc) {
-         throw new CannotProvideCoderException(
-          String.format("Cannot provide coder for parameterized type %s: %s",
-              type,
-              exc.getMessage()),
-          exc);
+        throw new CannotProvideCoderException(
+            String.format("Cannot provide coder for parameterized type %s: %s",
+                type,
+                exc.getMessage()),
+            exc);
       }
     }
-    return coderFactory.create(typeArgumentCoders);
+    return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders);
+  }
+
+  /**
+   * Attempts to create a {@link Coder} from any registered {@link CoderProvider} returning
+   * the first successfully created instance.
+   */
+  private Coder<?> getCoderFromFactories(
+      TypeDescriptor<?> typeDescriptor, List<Coder<?>> typeArgumentCoders)
+      throws CannotProvideCoderException {
+    List<CannotProvideCoderException> suppressedExceptions = new ArrayList<>();
+    for (CoderProvider coderProvider : coderProviders) {
+      try {
+        return coderProvider.coderFor(typeDescriptor, typeArgumentCoders);
+      } catch (CannotProvideCoderException e) {
+        // Add all failures as suppressed exceptions.
+        suppressedExceptions.add(e);
+      }
+    }
+
+    // Build up the error message and list of causes.
+    StringBuilder messageBuilder = new StringBuilder()
+        .append("Unable to provide a Coder for ").append(typeDescriptor).append(".\n")
+        .append("  Building a Coder using a registered CoderProvider failed.\n")
+        .append("  See suppressed exceptions for detailed failures.");
+    CannotProvideCoderException exceptionOnFailure =
+        new CannotProvideCoderException(messageBuilder.toString());
+    for (CannotProvideCoderException suppressedException : suppressedExceptions) {
+      exceptionOnFailure.addSuppressed(suppressedException);
+    }
+    throw exceptionOnFailure;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
index 523b69b..1762308 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
@@ -46,15 +46,6 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
     return decodedElements;
   }
 
-  /**
-   * Returns the first element in this collection if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Collection<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
   protected CollectionCoder(Coder<T> elemCoder) {
     super(elemCoder, "Collection");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index f33e210..edbaa7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -94,14 +94,6 @@ public abstract class CustomCoder<T> extends Coder<T>
   }
 
   /**
-   * Returns an empty list. A {@link CustomCoder} by default will not have component coders that are
-   * used for inference.
-   */
-  public static <T> List<Object> getInstanceComponents(T exampleValue) {
-    return Collections.emptyList();
-  }
-
-  /**
    * {@inheritDoc}
    *
    * @throws NonDeterministicException a {@link CustomCoder} is presumed

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
index 9a976f9..6eff9e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
@@ -17,45 +17,33 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * The {@link DefaultCoder} annotation
- * specifies a default {@link Coder} class to handle encoding and decoding
- * instances of the annotated class.
+ * The {@link DefaultCoder} annotation specifies a {@link Coder} class to handle encoding and
+ * decoding instances of the annotated class.
  *
- * <p>The specified {@link Coder} must satisfy the requirements of
- * {@link CoderProviders#fromStaticMethods}. Two classes provided by the SDK that
- * are intended for use with this annotation include {@link SerializableCoder}
- * and {@link AvroCoder}.
+ * <p>The specified {@link Coder} must have the following method:
+ * <pre>
+ * {@code public static CoderProvider getCoderProvider()}.
+ * </pre>
  *
- * <p>To configure the use of Java serialization as the default
- * for a class, annotate the class to use
- * {@link SerializableCoder} as follows:
- *
- * <pre><code>{@literal @}DefaultCoder(SerializableCoder.class)
- * public class MyCustomDataType implements Serializable {
- *   // ...
- * }</code></pre>
- *
- * <p>Similarly, to configure the use of
- * {@link AvroCoder} as the default:
- * <pre><code>{@literal @}DefaultCoder(AvroCoder.class)
- * public class MyCustomDataType {
- *   public MyCustomDataType() {}  // Avro requires an empty constructor.
- *   // ...
- * }</code></pre>
- *
- * <p>Coders specified explicitly via
- * {@link PCollection#setCoder}
- * take precedence, followed by Coders registered at runtime via
- * {@link CoderRegistry#registerCoder}. See {@link CoderRegistry} for a more detailed discussion
- * of the precedence rules.
+ * <p>Coders specified explicitly via {@link PCollection#setCoder} take precedence, followed by
+ * Coders found at runtime via {@link CoderRegistry#getCoder}.
+ * See {@link CoderRegistry} for a more detailed discussion of the precedence rules.
  */
 @Documented
 @Retention(RetentionPolicy.RUNTIME)
@@ -63,4 +51,77 @@ import org.apache.beam.sdk.values.PCollection;
 @SuppressWarnings("rawtypes")
 public @interface DefaultCoder {
   Class<? extends Coder> value();
+
+  /**
+   * A {@link CoderProviderRegistrar} that registers a {@link CoderProvider} which can use
+   * the {@code @DefaultCoder} annotation to provide {@link CoderProvider coder providers} that
+   * creates {@link Coder}s.
+   */
+  @AutoService(CoderProviderRegistrar.class)
+  class DefaultCoderProviderRegistrar implements CoderProviderRegistrar {
+
+    @Override
+    public List<CoderProvider> getCoderProviders() {
+      return ImmutableList.<CoderProvider>of(new DefaultCoderProvider());
+    }
+
+    /**
+     * A {@link CoderProvider} that uses the {@code @DefaultCoder} annotation to provide
+     * {@link CoderProvider coder providers} that create {@link Coder}s.
+     */
+    static class DefaultCoderProvider extends CoderProvider {
+      private static final Logger LOG = LoggerFactory.getLogger(DefaultCoderProvider.class);
+
+      /**
+       * Returns the {@link Coder} returned according to the {@link CoderProvider} from any
+       * {@link DefaultCoder} annotation on the given class.
+       */
+      @Override
+      public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+          List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+
+        Class<?> clazz = typeDescriptor.getRawType();
+        DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
+        if (defaultAnnotation == null) {
+          throw new CannotProvideCoderException(
+              String.format("Class %s does not have a @DefaultCoder annotation.",
+                  clazz.getName()));
+        }
+
+        if (defaultAnnotation.value() == null) {
+          throw new CannotProvideCoderException(
+              String.format("Class %s has a @DefaultCoder annotation with a null value.",
+                  clazz.getName()));
+        }
+
+        LOG.debug("DefaultCoder annotation found for {} with value {}",
+            clazz, defaultAnnotation.value());
+
+        Method coderProviderMethod;
+        try {
+          coderProviderMethod = defaultAnnotation.value().getMethod("getCoderProvider");
+        } catch (NoSuchMethodException e) {
+          throw new CannotProvideCoderException(String.format(
+              "Unable to find 'public static CoderProvider getCoderProvider()' on %s",
+              defaultAnnotation.value()),
+              e);
+        }
+
+        CoderProvider coderProvider;
+        try {
+          coderProvider = (CoderProvider) coderProviderMethod.invoke(null);
+        } catch (IllegalAccessException
+            | IllegalArgumentException
+            | InvocationTargetException
+            | NullPointerException
+            | ExceptionInInitializerError e) {
+          throw new CannotProvideCoderException(String.format(
+              "Unable to invoke 'public static CoderProvider getCoderProvider()' on %s",
+              defaultAnnotation.value()),
+              e);
+        }
+        return coderProvider.coderFor(typeDescriptor, componentCoders);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index 02c3d0f..b600b1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -41,15 +41,6 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
     return decodedElements;
   }
 
-  /**
-   * Returns the first element in this iterable if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Iterable<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
   protected IterableCoder(Coder<T> elemCoder) {
     super(elemCoder, "Iterable");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 8e10ca2..52b9c34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -75,18 +75,6 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
   private final Coder<T> elementCoder;
   private final String iterableName;
 
-  /**
-   * Returns the first element in the iterable-like {@code exampleValue} if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  protected static <T, IterableT extends Iterable<T>>
-      List<Object> getInstanceComponentsHelper(IterableT exampleValue) {
-    for (T value : exampleValue) {
-      return Arrays.<Object>asList(value);
-    }
-    return null;
-  }
-
   protected IterableLikeCoder(Coder<T> elementCoder, String  iterableName) {
     checkArgument(elementCoder != null, "element Coder for IterableLikeCoder must not be null");
     checkArgument(iterableName != null, "iterable name for IterableLikeCoder must not be null");

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 35b7449..da7f03c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -39,13 +39,6 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
     return new KvCoder<>(keyCoder, valueCoder);
   }
 
-  public static <K, V> List<Object> getInstanceComponents(
-      KV<K, V> exampleValue) {
-    return Arrays.asList(
-        exampleValue.getKey(),
-        exampleValue.getValue());
-  }
-
   public Coder<K> getKeyCoder() {
     return keyCoder;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 70bbf93..25f3ee9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -40,14 +40,6 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
     return decodedElements;
   }
 
-  /**
-   * Returns the first element in this list if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(List<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
   protected ListCoder(Coder<T> elemCoder) {
     super(elemCoder, "List");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index da2bf50..9e3c768 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -50,18 +50,6 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
     return new MapCoder<>(keyCoder, valueCoder);
   }
 
-  /**
-   * Returns the key and value for an arbitrary element of this map,
-   * if it is non-empty, otherwise returns {@code null}.
-   */
-   public static <K, V> List<Object> getInstanceComponents(
-       Map<K, V> exampleValue) {
-     for (Map.Entry<K, V> entry : exampleValue.entrySet()) {
-       return Arrays.asList(entry.getKey(), entry.getValue());
-     }
-     return null;
-   }
-
   public Coder<K> getKeyCoder() {
     return keyCoder;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index b52b9db..e3b2959 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.List;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -63,29 +66,45 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
   }
 
   /**
-   * A {@link CoderProvider} that constructs a {@link SerializableCoder}
-   * for any class that implements serializable.
+   * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible for
+   * all types.
+   *
+   * <p>This method is invoked reflectively from {@link DefaultCoder}.
    */
-  public static final CoderProvider PROVIDER = new CoderProvider() {
+  @SuppressWarnings("unused")
+  public static CoderProvider getCoderProvider() {
+    return new SerializableCoderProvider();
+  }
+
+  /**
+   * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
+   * serializable types.
+   */
+  @AutoService(CoderProviderRegistrar.class)
+  public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar {
+
     @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
-        throws CannotProvideCoderException {
-      Class<?> clazz = typeDescriptor.getRawType();
-      if (Serializable.class.isAssignableFrom(clazz)) {
-        @SuppressWarnings("unchecked")
-        Class<? extends Serializable> serializableClazz =
-            (Class<? extends Serializable>) clazz;
-        @SuppressWarnings("unchecked")
-        Coder<T> coder = (Coder<T>) SerializableCoder.of(serializableClazz);
-        return coder;
-      } else {
-        throw new CannotProvideCoderException(
-            "Cannot provide SerializableCoder because " + typeDescriptor
-            + " does not implement Serializable");
-      }
+    public List<CoderProvider> getCoderProviders() {
+      return ImmutableList.of(getCoderProvider());
     }
-  };
+  }
 
+  /**
+   * A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that
+   * implements serializable.
+   */
+  static class SerializableCoderProvider extends CoderProvider {
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
+        return SerializableCoder.of((TypeDescriptor) typeDescriptor);
+      }
+      throw new CannotProvideCoderException(
+          "Cannot provide SerializableCoder because " + typeDescriptor
+              + " does not implement Serializable");
+    }
+  }
 
   private final Class<T> type;
   private final TypeDescriptor<T> typeDescriptor;

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index da16165..baec128 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -56,15 +56,6 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
         new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor());
   }
 
-  /**
-   * Returns the first element in this set if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Set<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
   /////////////////////////////////////////////////////////////////////////////
   // Internal operations below here.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index 7fc094f..9a7b125 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -37,13 +37,6 @@ public class VarLongCoder extends StructuredCoder<Long> {
     return INSTANCE;
   }
 
-  /**
-   * Returns an empty list. {@link VarLongCoder} has no components.
-   */
-  public static <T> List<Object> getInstanceComponents(T ignored) {
-    return Collections.emptyList();
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   private static final VarLongCoder INSTANCE = new VarLongCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
index 29990cd..20ec300 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
@@ -125,14 +125,14 @@ public class CombineFnBase {
     @Override
     public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
         throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
+      return registry.getCoder(getClass(), AbstractGlobalCombineFn.class,
           ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder), getAccumTVariable());
     }
 
     @Override
     public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputT> inputCoder)
         throws CannotProvideCoderException {
-      return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
+      return registry.getCoder(getClass(), AbstractGlobalCombineFn.class,
           ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder, getAccumTVariable(),
               this.getAccumulatorCoder(registry, inputCoder)),
           getOutputTVariable());

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index d4c97bc..0515ed5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -339,7 +339,7 @@ public class CombineFns {
       List<Coder<Object>> coders = Lists.newArrayList();
       for (int i = 0; i < combineFnCount; ++i) {
         Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
+            registry.getOutputCoder(extractInputFns.get(i), dataCoder);
         coders.add(combineFns.get(i).getAccumulatorCoder(registry, inputCoder));
       }
       return new ComposedAccumulatorCoder(coders);
@@ -478,7 +478,7 @@ public class CombineFns {
       List<Coder<Object>> coders = Lists.newArrayList();
       for (int i = 0; i < combineFnCount; ++i) {
         Coder<Object> inputCoder =
-            registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
+            registry.getOutputCoder(extractInputFns.get(i), dataCoder);
         coders.add(combineFnWithContexts.get(i).getAccumulatorCoder(registry, inputCoder));
       }
       return new ComposedAccumulatorCoder(coders);

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 5624f2f..7af8fb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -27,17 +27,25 @@ import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CollectionCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.OffsetBasedSource;
@@ -317,7 +325,7 @@ public class Create<T> {
       if (coder.isPresent()) {
         return coder.get();
       } else if (typeDescriptor.isPresent()) {
-        return input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get());
+        return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
       } else {
         return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems);
       }
@@ -566,7 +574,7 @@ public class Create<T> {
       if (elementCoder.isPresent()) {
         return elementCoder.get();
       } else if (typeDescriptor.isPresent()) {
-        return input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get());
+        return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
       } else {
         Iterable<T> rawElements =
             Iterables.transform(
@@ -611,7 +619,7 @@ public class Create<T> {
     if (elementClazz.getTypeParameters().length == 0) {
       try {
         @SuppressWarnings("unchecked") // elementClazz is a wildcard type
-        Coder<T> coder = (Coder<T>) registry.getDefaultCoder(TypeDescriptor.of(elementClazz));
+        Coder<T> coder = (Coder<T>) registry.getCoder(TypeDescriptor.of(elementClazz));
         return coder;
       } catch (CannotProvideCoderException exc) {
         // Can't get a coder from the class of the elements, try with the elements next
@@ -619,11 +627,20 @@ public class Create<T> {
     }
 
     // If that fails, try to deduce a coder using the elements themselves
-    Optional<Coder<T>> coder = Optional.absent();
-    for (T elem : elems) {
-      Coder<T> c = registry.getDefaultCoder(elem);
+    return (Coder<T>) inferCoderFromObjects(registry, elems);
+  }
+
+  /**
+   * Attempts to infer the {@link Coder} of the elements ensuring that the returned coder is
+   * equivalent for all elements.
+   */
+  private static Coder<?> inferCoderFromObjects(
+      CoderRegistry registry, Iterable<?> elems) throws CannotProvideCoderException {
+    Optional<Coder<?>> coder = Optional.absent();
+    for (Object elem : elems) {
+      Coder<?> c = inferCoderFromObject(registry, elem);
       if (!coder.isPresent()) {
-        coder = Optional.of(c);
+        coder = (Optional) Optional.of(c);
       } else if (!Objects.equals(c, coder.get())) {
         throw new CannotProvideCoderException(
             "Cannot provide coder for elements of "
@@ -633,11 +650,48 @@ public class Create<T> {
                 + " Based on their values, they do not all default to the same Coder.");
       }
     }
+    if (coder.isPresent()) {
+      return coder.get();
+    }
 
-    if (!coder.isPresent()) {
-      throw new CannotProvideCoderException(
-          "Unable to infer a coder. Please register " + "a coder for ");
+    throw new CannotProvideCoderException("Cannot provide coder for elements of "
+        + Create.class.getSimpleName()
+        + ":"
+        + " For their common class, no coder could be provided."
+        + " Based on their values, no coder could be inferred.");
+  }
+
+  /**
+   * Attempt to infer the type for some very common Apache Beam parameterized types.
+   *
+   * <p>TODO: Instead, build a TypeDescriptor so that the {@link CoderRegistry} is invoked
+   * for the type instead of hard coding the coders for common types.
+   */
+  private static Coder<?> inferCoderFromObject(CoderRegistry registry, Object o)
+      throws CannotProvideCoderException {
+    if (o == null) {
+      return VoidCoder.of();
+    } else if (o instanceof TimestampedValue) {
+      return TimestampedValueCoder.of(
+          inferCoderFromObject(registry, ((TimestampedValue) o).getValue()));
+    } else if (o instanceof List) {
+      return ListCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+    } else if (o instanceof Set) {
+      return SetCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+    } else if (o instanceof Collection) {
+      return CollectionCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+    } else if (o instanceof Iterable) {
+      return IterableCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+    } else if (o instanceof Map) {
+      return MapCoder.of(
+          inferCoderFromObjects(registry, ((Map) o).keySet()),
+          inferCoderFromObjects(registry, ((Map) o).entrySet()));
+    } else if (o instanceof KV) {
+      return KvCoder.of(
+          inferCoderFromObject(registry, ((KV) o).getKey()),
+          inferCoderFromObject(registry, ((KV) o).getValue()));
+    } else {
+      return registry.getCoder(o.getClass());
     }
-    return coder.get();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 15abd98..d5df944 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -117,7 +117,7 @@ import org.apache.beam.sdk.values.TupleTag;
  * mapping from Java types to the default Coder to use, for a standard
  * set of Java types; users can extend this mapping for additional
  * types, via
- * {@link org.apache.beam.sdk.coders.CoderRegistry#registerCoder}.
+ * {@link org.apache.beam.sdk.coders.CoderRegistry#registerCoderProvider}.
  * If this inference process fails, either because the Java type was
  * not known at run-time (e.g., due to Java's "erasure" of generic
  * types) or there was no default Coder registered, then the Coder
@@ -281,7 +281,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
    * @throws CannotProvideCoderException if no coder can be inferred
    */
   protected Coder<?> getDefaultOutputCoder() throws CannotProvideCoderException {
-    throw new CannotProvideCoderException("PTransform.getDefaultOutputCoder called.");
+    throw new CannotProvideCoderException("PTransform.getOutputCoder called.");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index e67dbe1..edf1419 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -477,10 +477,10 @@ public class ParDo {
       Type typeArgument = typeArguments[i];
       TypeDescriptor<?> typeDescriptor = TypeDescriptor.of(typeArgument);
       try {
-        coders[i] = coderRegistry.getDefaultCoder(typeDescriptor);
+        coders[i] = coderRegistry.getCoder(typeDescriptor);
       } catch (CannotProvideCoderException e) {
         try {
-          coders[i] = coderRegistry.getDefaultCoder(
+          coders[i] = coderRegistry.getCoder(
               typeDescriptor, inputCoder.getEncodedTypeDescriptor(), inputCoder);
         } catch (CannotProvideCoderException ignored) {
           // Since not all type arguments will have a registered coder we ignore this exception.
@@ -623,7 +623,7 @@ public class ParDo {
     @SuppressWarnings("unchecked")
     protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
         throws CannotProvideCoderException {
-      return input.getPipeline().getCoderRegistry().getDefaultCoder(
+      return input.getPipeline().getCoderRegistry().getCoder(
           getFn().getOutputTypeDescriptor(),
           getFn().getInputTypeDescriptor(),
           ((PCollection<InputT>) input).getCoder());
@@ -767,7 +767,7 @@ public class ParDo {
         throws CannotProvideCoderException {
       @SuppressWarnings("unchecked")
       Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
-      return input.getPipeline().getCoderRegistry().getDefaultCoder(
+      return input.getPipeline().getCoderRegistry().getCoder(
           output.getTypeDescriptor(),
           getFn().getInputTypeDescriptor(),
           inputCoder);

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index dd38006..c66d1b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -124,9 +124,9 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
       Coder<K> keyCoder;
       CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
       if (keyClass == null) {
-        keyCoder = coderRegistry.getDefaultOutputCoder(fn, in.getCoder());
+        keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
       } else {
-        keyCoder = coderRegistry.getDefaultCoder(TypeDescriptor.of(keyClass));
+        keyCoder = coderRegistry.getCoder(TypeDescriptor.of(keyClass));
       }
       // TODO: Remove when we can set the coder inference context.
       result.setCoder(KvCoder.of(keyCoder, in.getCoder()));

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 0276ba6..0bfb875 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -87,12 +87,6 @@ public class GlobalWindow extends BoundedWindow {
       return Collections.emptyList();
     }
 
-    /**
-     * Returns an empty list. The Global Window Coder has no components.
-     */
-    public static <T> List<Object> getInstanceComponents(T exampleValue) {
-      return Collections.emptyList();
-    }
     private Coder() {}
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index fd2a2d8..46ece09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -178,13 +178,6 @@ public class IntervalWindow extends BoundedWindow
       return INSTANCE;
     }
 
-    /**
-     * Returns an empty list. {@link IntervalWindowCoder} has no components.
-     */
-    public static <T> List<Object> getInstanceComponents(T value) {
-      return Collections.emptyList();
-    }
-
     @Override
     public void encode(IntervalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 1095fb8..f210fd8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -142,7 +142,7 @@ public class PCollection<T> extends PValueBase implements PValue {
     CannotProvideCoderException inferFromTokenException = null;
     if (token != null) {
       try {
-        return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
+        return new CoderOrFailure<>(registry.getCoder(token), null);
       } catch (CannotProvideCoderException exc) {
         inferFromTokenException = exc;
         // Attempt to detect when the token came from a TupleTag used for a ParDo output,

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index a9f3929..c172885 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -132,10 +132,6 @@ public class TimestampedValue<V> {
       return valueCoder;
     }
 
-    public static <T> List<Object> getInstanceComponents(TimestampedValue<T> exampleValue) {
-      return Arrays.<Object>asList(exampleValue.getValue());
-    }
-
     @Override
     public TypeDescriptor<TimestampedValue<T>> getEncodedTypeDescriptor() {
       return new TypeDescriptor<TimestampedValue<T>>() {}.where(

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
deleted file mode 100644
index 4ffc9c1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Collections;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link CoderFactories}.
- */
-@RunWith(JUnit4.class)
-public class CoderFactoriesTest {
-
-  /**
-   * Ensures that a few of our standard atomic coder classes
-   * can each be built into a factory that works as expected.
-   * It is presumed that testing a few, not all, suffices to
-   * exercise CoderFactoryFromStaticMethods.
-   */
-  @Test
-  public void testAtomicCoderClassFactories() {
-    checkAtomicCoderFactory(StringUtf8Coder.class, StringUtf8Coder.of());
-    checkAtomicCoderFactory(DoubleCoder.class, DoubleCoder.of());
-    checkAtomicCoderFactory(ByteArrayCoder.class, ByteArrayCoder.of());
-  }
-
-  /**
-   * Checks that {#link CoderFactories.fromStaticMethods} successfully
-   * builds a working {@link CoderFactory} from {@link KvCoder KvCoder.class}.
-   */
-  @Test
-  public void testKvCoderFactory() {
-    CoderFactory kvCoderFactory = CoderFactories.fromStaticMethods(KvCoder.class);
-    assertEquals(
-        KvCoder.of(DoubleCoder.of(), DoubleCoder.of()),
-        kvCoderFactory.create(Arrays.asList(DoubleCoder.of(), DoubleCoder.of())));
-  }
-
-  /**
-   * Checks that {#link CoderFactories.fromStaticMethods} successfully
-   * builds a working {@link CoderFactory} from {@link ListCoder ListCoder.class}.
-   */
-  @Test
-  public void testListCoderFactory() {
-    CoderFactory listCoderFactory = CoderFactories.fromStaticMethods(ListCoder.class);
-
-    assertEquals(
-        ListCoder.of(DoubleCoder.of()),
-        listCoderFactory.create(Arrays.asList(DoubleCoder.of())));
-  }
-
-  /**
-   * Checks that {#link CoderFactories.fromStaticMethods} successfully
-   * builds a working {@link CoderFactory} from {@link IterableCoder IterableCoder.class}.
-   */
-  @Test
-  public void testIterableCoderFactory() {
-    CoderFactory iterableCoderFactory = CoderFactories.fromStaticMethods(IterableCoder.class);
-
-    assertEquals(
-        IterableCoder.of(DoubleCoder.of()),
-        iterableCoderFactory.create(Arrays.asList(DoubleCoder.of())));
-  }
-
-  ///////////////////////////////////////////////////////////////////////
-
-  /**
-   * Checks that an atomic coder class can be converted into
-   * a factory that then yields a coder equal to the example
-   * provided.
-   */
-  private <T> void checkAtomicCoderFactory(
-      Class<? extends Coder<T>> coderClazz,
-      Coder<T> expectedCoder) {
-    CoderFactory factory = CoderFactories.fromStaticMethods(coderClazz);
-    @SuppressWarnings("unchecked")
-    Coder<T> actualCoder = (Coder<T>) factory.create(Collections.<Coder<?>>emptyList());
-    assertEquals(expectedCoder, actualCoder);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
index 44be56d..2aa8351 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
@@ -17,54 +17,78 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
 
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Rule;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for {@link CoderFactories}.
+ * Tests for {@link CoderProviders}.
  */
 @RunWith(JUnit4.class)
 public class CoderProvidersTest {
+  @Test
+  public void testCoderProvidersFromStaticMethodsForParameterlessTypes() throws Exception {
+    CoderProvider factory = CoderProviders.fromStaticMethods(String.class, StringUtf8Coder.class);
+    assertEquals(StringUtf8Coder.of(),
+        factory.coderFor(TypeDescriptors.strings(), Collections.<Coder<?>>emptyList()));
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
+    factory = CoderProviders.fromStaticMethods(Double.class, DoubleCoder.class);
+    assertEquals(DoubleCoder.of(),
+        factory.coderFor(TypeDescriptors.doubles(), Collections.<Coder<?>>emptyList()));
 
-  @Test
-  public void testAvroThenSerializableStringMap() throws Exception {
-    CoderProvider provider = CoderProviders.firstOf(AvroCoder.PROVIDER, SerializableCoder.PROVIDER);
-    Coder<Map<String, String>> coder =
-        provider.getCoder(new TypeDescriptor<Map<String, String>>(){});
-    assertThat(coder, instanceOf(AvroCoder.class));
+    factory = CoderProviders.fromStaticMethods(byte[].class, ByteArrayCoder.class);
+    assertEquals(ByteArrayCoder.of(),
+        factory.coderFor(TypeDescriptor.of(byte[].class), Collections.<Coder<?>>emptyList()));
   }
 
+  /**
+   * Checks that {#link CoderProviders.fromStaticMethods} successfully
+   * builds a working {@link CoderProvider} from {@link KvCoder KvCoder.class}.
+   */
   @Test
-  public void testThrowingThenSerializable() throws Exception {
-    CoderProvider provider =
-        CoderProviders.firstOf(new ThrowingCoderProvider(), SerializableCoder.PROVIDER);
-    Coder<Integer> coder = provider.getCoder(new TypeDescriptor<Integer>(){});
-    assertThat(coder, instanceOf(SerializableCoder.class));
+  public void testKvCoderProvider() throws Exception {
+    TypeDescriptor<KV<Double, Double>> type =
+        TypeDescriptors.kvs(TypeDescriptors.doubles(), TypeDescriptors.doubles());
+    CoderProvider kvCoderProvider = CoderProviders.fromStaticMethods(KV.class, KvCoder.class);
+    assertEquals(
+        KvCoder.of(DoubleCoder.of(), DoubleCoder.of()),
+        kvCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of(), DoubleCoder.of())));
   }
 
+  /**
+   * Checks that {#link CoderProviders.fromStaticMethods} successfully
+   * builds a working {@link CoderProvider} from {@link ListCoder ListCoder.class}.
+   */
   @Test
-  public void testNullThrows() throws Exception {
-    CoderProvider provider = CoderProviders.firstOf(new ThrowingCoderProvider());
-    thrown.expect(CannotProvideCoderException.class);
-    thrown.expectMessage("ThrowingCoderProvider");
-    provider.getCoder(new TypeDescriptor<Integer>(){});
+  public void testListCoderProvider() throws Exception {
+    TypeDescriptor<List<Double>> type = TypeDescriptors.lists(TypeDescriptors.doubles());
+    CoderProvider listCoderProvider = CoderProviders.fromStaticMethods(List.class, ListCoder.class);
+
+    assertEquals(
+        ListCoder.of(DoubleCoder.of()),
+        listCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of())));
   }
 
-  private static class ThrowingCoderProvider implements CoderProvider {
-    @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-      throw new CannotProvideCoderException("ThrowingCoderProvider cannot ever provide a Coder");
-    }
+  /**
+   * Checks that {#link CoderProviders.fromStaticMethods} successfully
+   * builds a working {@link CoderProvider} from {@link IterableCoder IterableCoder.class}.
+   */
+  @Test
+  public void testIterableCoderProvider() throws Exception {
+    TypeDescriptor<Iterable<Double>> type = TypeDescriptors.iterables(TypeDescriptors.doubles());
+    CoderProvider iterableCoderProvider =
+        CoderProviders.fromStaticMethods(Iterable.class, IterableCoder.class);
+
+    assertEquals(
+        IterableCoder.of(DoubleCoder.of()),
+        iterableCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of())));
   }
 }