You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/05 21:45:15 UTC

[3/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types

[BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8e2cf89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8e2cf89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8e2cf89

Branch: refs/heads/master
Commit: f8e2cf89febeb2f86d2dc91c8d3fff5d43df3623
Parents: 6505988
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 19:55:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri May 5 14:44:32 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/complete/TfIdf.java    |   2 +-
 .../beam/examples/complete/TfIdfTest.java       |   2 +-
 .../beam/runners/core/SplittableParDo.java      |   4 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |   2 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  10 -
 .../org/apache/beam/sdk/coders/AtomicCoder.java |  14 -
 .../org/apache/beam/sdk/coders/AvroCoder.java   |  42 +-
 .../apache/beam/sdk/coders/ByteArrayCoder.java  |   8 -
 .../sdk/coders/CannotProvideCoderException.java |   2 +-
 .../java/org/apache/beam/sdk/coders/Coder.java  |   4 +-
 .../apache/beam/sdk/coders/CoderFactories.java  | 292 ---------
 .../apache/beam/sdk/coders/CoderFactory.java    |  44 --
 .../apache/beam/sdk/coders/CoderProvider.java   |  19 +-
 .../beam/sdk/coders/CoderProviderRegistrar.java |  42 ++
 .../apache/beam/sdk/coders/CoderProviders.java  | 240 +++----
 .../apache/beam/sdk/coders/CoderRegistrar.java  |  45 --
 .../apache/beam/sdk/coders/CoderRegistry.java   | 618 +++++++------------
 .../apache/beam/sdk/coders/CollectionCoder.java |   9 -
 .../org/apache/beam/sdk/coders/CustomCoder.java |   8 -
 .../apache/beam/sdk/coders/DefaultCoder.java    | 119 +++-
 .../apache/beam/sdk/coders/IterableCoder.java   |   9 -
 .../beam/sdk/coders/IterableLikeCoder.java      |  12 -
 .../org/apache/beam/sdk/coders/KvCoder.java     |   7 -
 .../org/apache/beam/sdk/coders/ListCoder.java   |   8 -
 .../org/apache/beam/sdk/coders/MapCoder.java    |  12 -
 .../beam/sdk/coders/SerializableCoder.java      |  57 +-
 .../org/apache/beam/sdk/coders/SetCoder.java    |   9 -
 .../apache/beam/sdk/coders/VarLongCoder.java    |   7 -
 .../beam/sdk/transforms/CombineFnBase.java      |   4 +-
 .../apache/beam/sdk/transforms/CombineFns.java  |   4 +-
 .../org/apache/beam/sdk/transforms/Create.java  |  76 ++-
 .../apache/beam/sdk/transforms/PTransform.java  |   4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   8 +-
 .../apache/beam/sdk/transforms/WithKeys.java    |   4 +-
 .../sdk/transforms/windowing/GlobalWindow.java  |   6 -
 .../transforms/windowing/IntervalWindow.java    |   7 -
 .../org/apache/beam/sdk/values/PCollection.java |   2 +-
 .../beam/sdk/values/TimestampedValue.java       |   4 -
 .../beam/sdk/coders/CoderFactoriesTest.java     | 100 ---
 .../beam/sdk/coders/CoderProvidersTest.java     |  82 ++-
 .../beam/sdk/coders/CoderRegistryTest.java      | 167 ++---
 .../beam/sdk/coders/DefaultCoderTest.java       |  65 +-
 .../beam/sdk/coders/IterableCoderTest.java      |  17 -
 .../apache/beam/sdk/coders/ListCoderTest.java   |  17 -
 .../apache/beam/sdk/coders/MapCoderTest.java    |  20 -
 .../beam/sdk/coders/SerializableCoderTest.java  |  11 +
 .../beam/sdk/transforms/CombineFnsTest.java     |  10 +-
 .../apache/beam/sdk/transforms/CreateTest.java  |   6 +-
 .../sdk/transforms/FlatMapElementsTest.java     |   4 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |   4 +-
 .../beam/sdk/transforms/MapElementsTest.java    |   4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  17 +-
 .../transforms/reflect/DoFnInvokersTest.java    |   4 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  28 +-
 .../sdk/extensions/protobuf/ProtoCoder.java     |  72 ++-
 .../ProtobufCoderProviderRegistrar.java         |  41 ++
 .../protobuf/ProtobufCoderRegistrar.java        |  39 --
 .../sdk/extensions/protobuf/ProtoCoderTest.java |   7 +-
 .../BigQueryCoderProviderRegistrar.java         |  40 ++
 .../io/gcp/bigquery/BigQueryCoderRegistrar.java |  39 --
 .../io/gcp/bigquery/DynamicDestinations.java    |   2 +-
 .../pubsub/PubsubCoderProviderRegistrar.java    |  37 ++
 .../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java |  35 --
 .../BigQueryCoderProviderRegistrarTest.java     |  40 ++
 .../bigquery/BigQueryCoderRegistrarTest.java    |  40 --
 sdks/java/io/hadoop-common/pom.xml              |   7 +
 .../beam/sdk/io/hadoop/WritableCoder.java       |  56 ++
 .../beam/sdk/io/hadoop/WritableCoderTest.java   |  10 +
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |   2 +-
 .../beam/sdk/transforms/DistinctJava8Test.java  |   8 -
 .../beam/sdk/transforms/FilterJava8Test.java    |   2 +-
 .../beam/sdk/transforms/PartitionJava8Test.java |   2 +-
 .../beam/sdk/transforms/WithKeysJava8Test.java  |   9 -
 73 files changed, 1121 insertions(+), 1689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 6fd9755..7552b94 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -409,7 +409,7 @@ public class TfIdf {
   public static void main(String[] args) throws Exception {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     Pipeline pipeline = Pipeline.create(options);
-    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+    pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
 
     pipeline
         .apply(new ReadDocuments(listInputDocuments(options)))

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
index d263643..3681ff5 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
@@ -48,7 +48,7 @@ public class TfIdfTest {
   @Category(ValidatesRunner.class)
   public void testTfIdf() throws Exception {
 
-    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+    pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
 
     PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
         .apply(Create.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index a4d9639..6503fa2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -274,7 +274,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             input,
         PCollection<T> output)
         throws CannotProvideCoderException {
-      // Similar logic to ParDo.MultiOutput.getDefaultOutputCoder.
+      // Similar logic to ParDo.MultiOutput.getOutputCoder.
       @SuppressWarnings("unchecked")
       KeyedWorkItemCoder<String, ElementAndRestriction<InputT, RestrictionT>> kwiCoder =
           (KeyedWorkItemCoder) input.getCoder();
@@ -284,7 +284,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       return input
           .getPipeline()
           .getCoderRegistry()
-          .getDefaultCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
+          .getCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 46f26a1..1e60ca3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -229,7 +229,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
     CoderRegistry reg = pipeline.getCoderRegistry();
     StateTag<CombiningState<Long, long[], Long>> stateTag =
         StateTags.combiningValue("summer",
-            sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
+            sumLongFn.getAccumulatorCoder(reg, reg.getCoder(Long.class)), sumLongFn);
     GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
     assertThat(underlyingValue.read(), equalTo(0L));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 0fe9585..85e55eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -577,14 +577,4 @@ public class DirectRunnerTest implements Serializable {
       return underlying.getDefaultOutputCoder();
     }
   }
-
-  @Test
-  public void fallbackCoderProviderAllowsInference() {
-    // See https://issues.apache.org/jira/browse/BEAM-1642
-    Pipeline p = getPipeline();
-    p.getCoderRegistry().setFallbackCoderProvider(
-        org.apache.beam.sdk.coders.AvroCoder.PROVIDER);
-    p.apply(Create.of(Arrays.asList(100, 200))).apply(Count.<Integer>globally());
-    p.run().waitUntilFinish();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 528cfb0..043fe93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -33,20 +33,6 @@ import java.util.List;
  */
 public abstract class AtomicCoder<T> extends StructuredCoder<T> {
   /**
-   * Returns an empty list.
-   *
-   * <p>{@link CoderFactories#fromStaticMethods(Class)} builds a {@link CoderFactory} from the
-   * {@code #of()} method and this method, used to determine the components of an object. Because
-   * {@link AtomicCoder} has no components, always returns an empty list.
-   *
-   * @param exampleValue unused, but part of the latent interface expected by {@link
-   *     CoderFactories#fromStaticMethods}
-   */
-  public static <T> List<Object> getInstanceComponents(T exampleValue) {
-    return Collections.emptyList();
-  }
-
-  /**
    * {@inheritDoc}.
    *
    * @throws NonDeterministicException

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 2aa2b44..f82c065 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import javax.annotation.Nullable;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -140,18 +141,39 @@ public class AvroCoder<T> extends CustomCoder<T> {
     return new AvroCoder<>(type, schema);
   }
 
-  public static final CoderProvider PROVIDER = new CoderProvider() {
+  /**
+   * Returns a {@link CoderProvider} which uses the {@link AvroCoder} if possible for
+   * all types.
+   *
+   * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
+   * accept dangerous types such as {@link Object}.
+   *
+   * <p>This method is invoked reflectively from {@link DefaultCoder}.
+   */
+  @SuppressWarnings("unused")
+  public static CoderProvider getCoderProvider() {
+    return new AvroCoderProvider();
+  }
+
+  /**
+   * A {@link CoderProvider} that constructs an {@link AvroCoder} for Avro compatible classes.
+   *
+   * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
+   * accept dangerous types such as {@link Object}.
+   */
+  static class AvroCoderProvider extends CoderProvider {
     @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) {
-      // This is a downcast from `? super T` to T. However, because
-      // it comes from a TypeDescriptor<T>, the class object itself
-      // is the same so the supertype in question shares the same
-      // generated AvroCoder schema.
-      @SuppressWarnings("unchecked")
-      Class<T> rawType = (Class<T>) typeDescriptor.getRawType();
-      return AvroCoder.of(rawType);
+    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+      try {
+        return AvroCoder.of(typeDescriptor);
+      } catch (AvroRuntimeException e) {
+        throw new CannotProvideCoderException(
+            String.format("%s is not compatible with Avro", typeDescriptor),
+            e);
+      }
     }
-  };
+  }
 
   private final Class<T> type;
   private final SerializableSchemaSupplier schemaSupplier;

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index 28cb627..d83d832 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -21,7 +21,6 @@ import com.google.common.io.ByteStreams;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
 import org.apache.beam.sdk.util.StreamUtils;
@@ -45,13 +44,6 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
     return INSTANCE;
   }
 
-  /**
-   * Returns an empty list. {@link ByteArrayCoder} has no components.
-   */
-  public static <T> List<Object> getInstanceComponents(T ignored) {
-    return Collections.emptyList();
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   private static final ByteArrayCoder INSTANCE = new ByteArrayCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
index c37ec00..bc2ef3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.coders;
 
 /**
- * The exception thrown when a {@link CoderProvider} cannot
+ * The exception thrown when a {@link CoderRegistry} or {@link CoderProvider} cannot
  * provide a {@link Coder} that has been requested.
  */
 public class CannotProvideCoderException extends Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 41e83ac..eeafbd2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -51,9 +51,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * <p>{@link Coder} classes for compound types are often composed from coder classes for types
  * contains therein. The composition of {@link Coder} instances into a coder for the compound
- * class is the subject of the {@link CoderFactory} type, which enables automatic generic
+ * class is the subject of the {@link CoderProvider} type, which enables automatic generic
  * composition of {@link Coder} classes within the {@link CoderRegistry}. With particular
- * static methods on a compound {@link Coder} class, a {@link CoderFactory} can be automatically
+ * static methods on a compound {@link Coder} class, a {@link CoderProvider} can be automatically
  * inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports
  * automatic composition in the {@link CoderRegistry}.
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
deleted file mode 100644
index 4f05c95..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.MoreObjects;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * Static utility methods for creating and working with {@link Coder}s.
- */
-public final class CoderFactories {
-  private CoderFactories() { } // Static utility class
-
-  /**
-   * Creates a {@link CoderFactory} built from particular static methods of a class that
-   * implements {@link Coder}.
-   *
-   * <p>The class must have the following static methods:
-   *
-   * <ul>
-   * <li> {@code
-   * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
-   * }
-   * <li> {@code
-   * public static List<Object> getInstanceComponents(T exampleValue);
-   * }
-   * </ul>
-   *
-   * <p>The {@code of(...)} method will be used to construct a
-   * {@code Coder<T>} from component {@link Coder}s.
-   * It must accept one {@link Coder} argument for each
-   * generic type parameter of {@code T}. If {@code T} takes no generic
-   * type parameters, then the {@code of()} factory method should take
-   * no arguments.
-   *
-   * <p>The {@code getInstanceComponents} method will be used to
-   * decompose a value during the {@link Coder} inference process,
-   * to automatically choose coders for the components.
-   *
-   * <p>Note that the class {@code T} to be coded may be a
-   * not-yet-specialized generic class.
-   * For a generic class {@code MyClass<X>} and an actual type parameter
-   * {@code Foo}, the {@link CoderFactoryFromStaticMethods} will
-   * accept any {@code Coder<Foo>} and produce a {@code Coder<MyClass<Foo>>}.
-   *
-   * <p>For example, the {@link CoderFactory} returned by
-   * {@code fromStaticMethods(ListCoder.class)}
-   * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}.
-   */
-  public static <T> CoderFactory fromStaticMethods(Class<T> clazz) {
-    checkArgument(
-        Coder.class.isAssignableFrom(clazz),
-        "%s is not a subtype of %s",
-        clazz.getName(),
-        Coder.class.getSimpleName());
-    return new CoderFactoryFromStaticMethods((Class<? extends Coder>) clazz);
-  }
-
-  /**
-   * Creates a {@link CoderFactory} that always returns the
-   * given coder.
-   *
-   * <p>The {@code getInstanceComponents} method of this
-   * {@link CoderFactory} always returns an empty list.
-   */
-  public static <T> CoderFactory forCoder(Coder<T> coder) {
-    return new CoderFactoryForCoder<>(coder);
-  }
-
-  /**
-   * See {@link #fromStaticMethods} for a detailed description
-   * of the characteristics of this {@link CoderFactory}.
-   */
-  private static class CoderFactoryFromStaticMethods implements CoderFactory {
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
-      try {
-        return (Coder) factoryMethod.invoke(
-            null /* static */, componentCoders.toArray());
-      } catch (IllegalAccessException
-           | IllegalArgumentException
-           | InvocationTargetException
-           | NullPointerException
-           | ExceptionInInitializerError exn) {
-        throw new IllegalStateException(
-            "error when invoking Coder factory method " + factoryMethod,
-            exn);
-      }
-    }
-
-    @Override
-    public List<Object> getInstanceComponents(Object value) {
-      try {
-        @SuppressWarnings("unchecked")
-        List<Object> components =  (List<Object>) getComponentsMethod.invoke(
-            null /* static */, value);
-        return components;
-      } catch (IllegalAccessException
-          | IllegalArgumentException
-          | InvocationTargetException
-          | NullPointerException
-          | ExceptionInInitializerError exn) {
-        throw new IllegalStateException(
-            "error when invoking Coder getComponents method " + getComponentsMethod,
-            exn);
-      }
-    }
-
-    ////////////////////////////////////////////////////////////////////////////////
-
-    // Method to create a coder given component coders
-    // For a Coder class of kind * -> * -> ... n times ... -> *
-    // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
-    private final Method factoryMethod;
-
-    // Method to decompose a value of type T into its parts.
-    // For a Coder class of kind * -> * -> ... n times ... -> *
-    // this has type T -> List<Object>
-    // where the list has n elements.
-    private final Method getComponentsMethod;
-
-    /**
-     * Returns a CoderFactory that invokes the given static factory method
-     * to create the Coder.
-     */
-    private CoderFactoryFromStaticMethods(Class<? extends Coder> coderClazz) {
-      this.factoryMethod = getFactoryMethod(coderClazz);
-      this.getComponentsMethod = getInstanceComponentsMethod(coderClazz);
-    }
-
-    /**
-     * Returns the static {@code of} constructor method on {@code coderClazz}
-     * if it exists. It is assumed to have one {@link Coder} parameter for
-     * each type parameter of {@code coderClazz}.
-     */
-    private Method getFactoryMethod(Class<?> coderClazz) {
-      Method factoryMethodCandidate;
-
-      // Find the static factory method of coderClazz named 'of' with
-      // the appropriate number of type parameters.
-      int numTypeParameters = coderClazz.getTypeParameters().length;
-      Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
-      Arrays.fill(factoryMethodArgTypes, Coder.class);
-      try {
-        factoryMethodCandidate =
-            coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
-      } catch (NoSuchMethodException | SecurityException exn) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "does not have an accessible method named 'of' with "
-            + numTypeParameters + " arguments of Coder type",
-            exn);
-      }
-      if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type is not static");
-      }
-      if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type does not return a " + coderClazz);
-      }
-      try {
-        if (!factoryMethodCandidate.isAccessible()) {
-          factoryMethodCandidate.setAccessible(true);
-        }
-      } catch (SecurityException exn) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type is not accessible",
-            exn);
-      }
-
-      return factoryMethodCandidate;
-    }
-
-    /**
-     * Finds the static method on {@code coderType} to use
-     * to decompose a value of type {@code T} into components,
-     * each corresponding to an argument of the {@code of}
-     * method.
-     */
-    private <T, CoderT extends Coder> Method getInstanceComponentsMethod(Class<CoderT> coderClazz) {
-      TypeDescriptor<CoderT> coderType = TypeDescriptor.of(coderClazz);
-      TypeDescriptor<T> argumentType = getCodedType(coderType);
-
-      // getInstanceComponents may be implemented in a superclass,
-      // so we search them all for an applicable method. We do not
-      // try to be clever about finding the best overload. It may
-      // be in a generic superclass, erased to accept an Object.
-      // However, subtypes are listed before supertypes (it is a
-      // topological ordering) so probably the best one will be chosen
-      // if there are more than one (which should be rare)
-      for (TypeDescriptor<?> supertype : coderType.getClasses()) {
-        for (Method method : supertype.getRawType().getDeclaredMethods()) {
-          if (method.getName().equals("getInstanceComponents")) {
-            TypeDescriptor<?> formalArgumentType = supertype.getArgumentTypes(method).get(0);
-            if (formalArgumentType.getRawType().isAssignableFrom(argumentType.getRawType())) {
-              return method;
-            }
-          }
-        }
-      }
-
-      throw new IllegalArgumentException(
-          "cannot create a CoderFactory from " + coderType + ": "
-          + "does not have an accessible method "
-          + "'getInstanceComponents'");
-    }
-
-    /**
-     * If {@code coderType} is a subclass of {@link Coder} for a specific type {@code T}, returns
-     * {@code T.class}. Otherwise, raises IllegalArgumentException.
-     */
-    private <T, CoderT extends Coder> TypeDescriptor<T> getCodedType(
-        TypeDescriptor<CoderT> coderType) {
-      TypeDescriptor<? super CoderT> coderSupertype = coderType.getSupertype(Coder.class);
-      ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType();
-      @SuppressWarnings("unchecked")
-      TypeDescriptor<T> token =
-          (TypeDescriptor<T>) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]);
-      return token;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("factoryMethod", factoryMethod)
-          .add("getComponentsMethod", getComponentsMethod)
-          .toString();
-    }
-  }
-
-  /**
-   * See {@link #forCoder} for a detailed description of this
-   * {@link CoderFactory}.
-   */
-  private static class CoderFactoryForCoder<T> implements CoderFactory {
-    private final Coder<T> coder;
-
-    public CoderFactoryForCoder(Coder<T> coder) {
-      this.coder = coder;
-    }
-
-    @Override
-    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
-      return this.coder;
-    }
-
-    @Override
-    public List<Object> getInstanceComponents(Object value) {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("coder", coder)
-          .toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
deleted file mode 100644
index 22d03fa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import java.util.List;
-
-/**
- * A {@link CoderFactory} creates coders and decomposes values.
- * It may operate on a parameterized type, such as {@link List},
- * in which case the {@link #create} method accepts a list of
- * coders to use for the type parameters.
- */
-public interface CoderFactory {
-
-  /**
-   * Returns a {@code Coder<?>}, given argument coder to use for
-   * values of a particular type, given the Coders for each of
-   * the type's generic parameter types.
-   */
-  Coder<?> create(List<? extends Coder<?>> componentCoders);
-
-  /**
-   * Returns a list of objects contained in {@code value}, one per
-   * type argument, or {@code null} if none can be determined.
-   * The list of returned objects should be the same size as the
-   * list of coders required by {@link #create}.
-   */
-  List<Object> getInstanceComponents(Object value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
index 0db73eb..ac042ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
@@ -17,18 +17,25 @@
  */
 package org.apache.beam.sdk.coders;
 
+import java.util.List;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
- * A {@link CoderProvider} may create a {@link Coder} for
- * any concrete class.
+ * A {@link CoderProvider} provides {@link Coder}s.
+ *
+ * <p>It may operate on a parameterized type, such as {@link List}, in which case the
+ * {@link #coderFor} method accepts a list of coders to use for the type parameters.
  */
-public interface CoderProvider {
+public abstract class CoderProvider {
 
   /**
-   * Provides a coder for a given class, if possible.
+   * Returns a {@code Coder<T>} to use for values of a particular type, given the Coders for each of
+   * the type's generic parameter types.
    *
-   * @throws CannotProvideCoderException if no coder can be provided
+   * <p>Throws {@link CannotProvideCoderException} if this {@link CoderProvider} cannot provide
+   * a coder for this type and components.
    */
-  <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException;
+  public abstract <T> Coder<T> coderFor(
+      TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders)
+      throws CannotProvideCoderException;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
new file mode 100644
index 0000000..35d061d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import com.google.auto.service.AutoService;
+import java.util.List;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * {@link Coder} creators have the ability to automatically have their
+ * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry
+ * and a concrete implementation of this interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+@Experimental
+public interface CoderProviderRegistrar {
+  /**
+   * Returns a list of {@link CoderProvider coder providers} which
+   * will be registered by default within each {@link CoderRegistry coder registry} instance.
+   *
+   * <p>See {@link CoderProviders} for convenience methods to construct a {@link CoderProvider}.
+   */
+  List<CoderProvider> getCoderProviders();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
index c072008..414fd8b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
@@ -19,146 +19,178 @@ package org.apache.beam.sdk.coders;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.google.common.base.MoreObjects;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
- * Static utility methods for working with {@link CoderProvider CoderProviders}.
+ * Static utility methods for creating and working with {@link CoderProvider}s.
  */
 public final class CoderProviders {
-
-  // Static utility class
-  private CoderProviders() { }
+  private CoderProviders() { } // Static utility class
 
   /**
-   * Creates a {@link CoderProvider} built from particular static methods of a class that
-   * implements {@link Coder}. The requirements for this method are precisely the requirements
-   * for a {@link Coder} class to be usable with {@link DefaultCoder} annotations.
-   *
-   * <p>The class must have the following static method:
-   *
-   * <pre>{@code
-   * public static Coder<T> of(TypeDescriptor<T> type)
-   * }
-   * </pre>
+   * Creates a {@link CoderProvider} from a class's
+   * {@code static <T> Coder<T> of(TypeDescriptor<T>, List<Coder<?>>}) method.
    */
-  public static <T> CoderProvider fromStaticMethods(Class<T> clazz) {
-    return new CoderProviderFromStaticMethods(clazz);
+  public static CoderProvider fromStaticMethods(Class<?> rawType, Class<?> coderClazz) {
+    checkArgument(
+        Coder.class.isAssignableFrom(coderClazz),
+        "%s is not a subtype of %s",
+        coderClazz.getName(),
+        Coder.class.getSimpleName());
+    return new CoderProviderFromStaticMethods(rawType, coderClazz);
   }
 
-
   /**
-   * Returns a {@link CoderProvider} that consults each of the provider {@code coderProviders}
-   * and returns the first {@link Coder} provided.
-   *
-   * <p>Note that the order in which the providers are listed matters: While the set of types
-   * handled will be the union of those handled by all of the providers in the list, the actual
-   * {@link Coder} provided by the first successful provider may differ, and may have inferior
-   * properties. For example, not all {@link Coder Coders} are deterministic, handle {@code null}
-   * values, or have comparable performance.
+   * Creates a {@link CoderProvider} that always returns the
+   * given coder for the specified type.
    */
-  public static CoderProvider firstOf(CoderProvider... coderProviders) {
-    return new FirstOf(ImmutableList.copyOf(coderProviders));
+  public static CoderProvider forCoder(TypeDescriptor<?> type, Coder<?> coder) {
+    return new CoderProviderForCoder(type, coder);
   }
 
-  ///////////////////////////////////////////////////////////////////////////////////////////////
-
   /**
-   * @see #firstOf
+   * See {@link #fromStaticMethods} for a detailed description
+   * of the characteristics of this {@link CoderProvider}.
    */
-  private static class FirstOf implements CoderProvider {
-
-    private Iterable<CoderProvider> providers;
-
-    public FirstOf(Iterable<CoderProvider> providers) {
-      this.providers = providers;
-    }
+  private static class CoderProviderFromStaticMethods extends CoderProvider {
 
     @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-      List<String> messages = Lists.newArrayList();
-      for (CoderProvider provider : providers) {
-        try {
-          return provider.getCoder(type);
-        } catch (CannotProvideCoderException exc) {
-          messages.add(String.format("%s could not provide a Coder for type %s: %s",
-              provider, type, exc.getMessage()));
-        }
+    public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends Coder<?>> componentCoders)
+        throws CannotProvideCoderException {
+      if (!this.rawType.equals(type.getRawType())) {
+        throw new CannotProvideCoderException(String.format(
+            "Unable to provide coder for %s, this factory can only provide coders for %s",
+            type,
+            this.rawType));
+      }
+      try {
+        return (Coder) factoryMethod.invoke(
+            null /* static */, componentCoders.toArray());
+      } catch (IllegalAccessException
+           | IllegalArgumentException
+           | InvocationTargetException
+           | NullPointerException
+           | ExceptionInInitializerError exn) {
+        throw new IllegalStateException(
+            "error when invoking Coder factory method " + factoryMethod,
+            exn);
       }
-      throw new CannotProvideCoderException(
-          String.format("Cannot provide coder for type %s: %s.",
-              type, Joiner.on("; ").join(messages)));
     }
-  }
 
-  private static class CoderProviderFromStaticMethods implements CoderProvider {
+    ////////////////////////////////////////////////////////////////////////////////
 
-    /** If true, then clazz has {@code of(TypeDescriptor)}. If false, {@code of(Class)}. */
-    private final boolean takesTypeDescriptor;
-    private final Class<?> clazz;
+    // Type raw type used to filter the incoming type on.
+    private final Class<?> rawType;
 
-    public CoderProviderFromStaticMethods(Class<?> clazz) {
-      // Note that the second condition supports older classes, which only needed to provide
-      // of(Class), not of(TypeDescriptor). Our own classes have updated to accept a
-      // TypeDescriptor. Hence the error message points only to the current specification,
-      // not both acceptable conditions.
-      checkArgument(classTakesTypeDescriptor(clazz) || classTakesClass(clazz),
-          "Class " + clazz.getCanonicalName()
-          + " is missing required static method of(TypeDescriptor).");
+    // Method to create a coder given component coders
+    // For a Coder class of kind * -> * -> ... n times ... -> *
+    // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
+    private final Method factoryMethod;
 
-      this.takesTypeDescriptor = classTakesTypeDescriptor(clazz);
-      this.clazz = clazz;
+    /**
+     * Returns a CoderProvider that invokes the given static factory method
+     * to create the Coder.
+     */
+    private CoderProviderFromStaticMethods(Class<?> rawType, Class<?> coderClazz) {
+      this.rawType = rawType;
+      this.factoryMethod = getFactoryMethod(coderClazz);
     }
 
-    @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+    /**
+     * Returns the static {@code of} constructor method on {@code coderClazz}
+     * if it exists. It is assumed to have one {@link Coder} parameter for
+     * each type parameter of {@code coderClazz}.
+     */
+    private Method getFactoryMethod(Class<?> coderClazz) {
+      Method factoryMethodCandidate;
+
+      // Find the static factory method of coderClazz named 'of' with
+      // the appropriate number of type parameters.
+      int numTypeParameters = coderClazz.getTypeParameters().length;
+      Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
+      Arrays.fill(factoryMethodArgTypes, Coder.class);
       try {
-        if (takesTypeDescriptor) {
-          @SuppressWarnings("unchecked")
-          Coder<T> result = InstanceBuilder.ofType(Coder.class)
-              .fromClass(clazz)
-              .fromFactoryMethod("of")
-              .withArg(TypeDescriptor.class, type)
-              .build();
-          return result;
-        } else {
-          @SuppressWarnings("unchecked")
-          Coder<T> result = InstanceBuilder.ofType(Coder.class)
-              .fromClass(clazz)
-              .fromFactoryMethod("of")
-              .withArg(Class.class, type.getRawType())
-              .build();
-          return result;
-        }
-      } catch (RuntimeException exc) {
-        if (exc.getCause() instanceof InvocationTargetException) {
-          throw new CannotProvideCoderException(exc.getCause().getCause());
+        factoryMethodCandidate =
+            coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
+      } catch (NoSuchMethodException | SecurityException exn) {
+        throw new IllegalArgumentException(
+            "cannot register Coder " + coderClazz + ": "
+            + "does not have an accessible method named 'of' with "
+            + numTypeParameters + " arguments of Coder type",
+            exn);
+      }
+      if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
+        throw new IllegalArgumentException(
+            "cannot register Coder " + coderClazz + ": "
+            + "method named 'of' with " + numTypeParameters
+            + " arguments of Coder type is not static");
+      }
+      if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
+        throw new IllegalArgumentException(
+            "cannot register Coder " + coderClazz + ": "
+            + "method named 'of' with " + numTypeParameters
+            + " arguments of Coder type does not return a " + coderClazz);
+      }
+      try {
+        if (!factoryMethodCandidate.isAccessible()) {
+          factoryMethodCandidate.setAccessible(true);
         }
-        throw exc;
+      } catch (SecurityException exn) {
+        throw new IllegalArgumentException(
+            "cannot register Coder " + coderClazz + ": "
+            + "method named 'of' with " + numTypeParameters
+            + " arguments of Coder type is not accessible",
+            exn);
       }
+
+      return factoryMethodCandidate;
     }
 
-    private boolean classTakesTypeDescriptor(Class<?> clazz) {
-      try {
-        clazz.getDeclaredMethod("of", TypeDescriptor.class);
-        return true;
-      } catch (NoSuchMethodException | SecurityException exc) {
-        return false;
-      }
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("rawType", rawType)
+          .add("factoryMethod", factoryMethod)
+          .toString();
     }
+  }
 
-    private boolean classTakesClass(Class<?> clazz) {
-      try {
-        clazz.getDeclaredMethod("of", Class.class);
-        return true;
-      } catch (NoSuchMethodException | SecurityException exc) {
-        return false;
+  /**
+   * See {@link #forCoder} for a detailed description of this {@link CoderProvider}.
+   */
+  private static class CoderProviderForCoder extends CoderProvider {
+    private final Coder<?> coder;
+    private final TypeDescriptor<?> type;
+
+    public CoderProviderForCoder(TypeDescriptor<?> type, Coder<?> coder){
+      this.type = type;
+      this.coder = coder;
+    }
+
+    @Override
+    public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends Coder<?>> componentCoders)
+        throws CannotProvideCoderException {
+      if (!this.type.equals(type)) {
+        throw new CannotProvideCoderException(String.format(
+            "Unable to provide coder for %s, this factory can only provide coders for %s",
+            type,
+            this.type));
       }
+      return (Coder) coder;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("type", type)
+          .add("coder", coder)
+          .toString();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
deleted file mode 100644
index fced976..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * {@link Coder} creators have the ability to automatically have their
- * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry
- * and a concrete implementation of this interface.
- *
- * <p>It is optional but recommended to use one of the many build time tools such as
- * {@link AutoService} to generate the necessary META-INF files automatically.
- */
-@Experimental
-public interface CoderRegistrar {
-  /**
-   * Returns a mapping of {@link Class classes} to {@link CoderFactory coder factories} which
-   * will be registered by default within each {@link CoderRegistry coder registry} instance.
-   *
-   * <p>See {@link CoderFactories} for convenience methods to construct a {@link CoderFactory}.
-   *
-   * <p>Note that a warning is logged if multiple {@link CoderRegistrar coder registrars} provide
-   * mappings for the same {@link Class}.
-   */
-  Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses();
-}