You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/05 21:45:15 UTC
[3/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow
creating coders through CoderFactory for a wider range of types
[BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8e2cf89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8e2cf89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8e2cf89
Branch: refs/heads/master
Commit: f8e2cf89febeb2f86d2dc91c8d3fff5d43df3623
Parents: 6505988
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 19:55:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri May 5 14:44:32 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/complete/TfIdf.java | 2 +-
.../beam/examples/complete/TfIdfTest.java | 2 +-
.../beam/runners/core/SplittableParDo.java | 4 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 2 +-
.../beam/runners/direct/DirectRunnerTest.java | 10 -
.../org/apache/beam/sdk/coders/AtomicCoder.java | 14 -
.../org/apache/beam/sdk/coders/AvroCoder.java | 42 +-
.../apache/beam/sdk/coders/ByteArrayCoder.java | 8 -
.../sdk/coders/CannotProvideCoderException.java | 2 +-
.../java/org/apache/beam/sdk/coders/Coder.java | 4 +-
.../apache/beam/sdk/coders/CoderFactories.java | 292 ---------
.../apache/beam/sdk/coders/CoderFactory.java | 44 --
.../apache/beam/sdk/coders/CoderProvider.java | 19 +-
.../beam/sdk/coders/CoderProviderRegistrar.java | 42 ++
.../apache/beam/sdk/coders/CoderProviders.java | 240 +++----
.../apache/beam/sdk/coders/CoderRegistrar.java | 45 --
.../apache/beam/sdk/coders/CoderRegistry.java | 618 +++++++------------
.../apache/beam/sdk/coders/CollectionCoder.java | 9 -
.../org/apache/beam/sdk/coders/CustomCoder.java | 8 -
.../apache/beam/sdk/coders/DefaultCoder.java | 119 +++-
.../apache/beam/sdk/coders/IterableCoder.java | 9 -
.../beam/sdk/coders/IterableLikeCoder.java | 12 -
.../org/apache/beam/sdk/coders/KvCoder.java | 7 -
.../org/apache/beam/sdk/coders/ListCoder.java | 8 -
.../org/apache/beam/sdk/coders/MapCoder.java | 12 -
.../beam/sdk/coders/SerializableCoder.java | 57 +-
.../org/apache/beam/sdk/coders/SetCoder.java | 9 -
.../apache/beam/sdk/coders/VarLongCoder.java | 7 -
.../beam/sdk/transforms/CombineFnBase.java | 4 +-
.../apache/beam/sdk/transforms/CombineFns.java | 4 +-
.../org/apache/beam/sdk/transforms/Create.java | 76 ++-
.../apache/beam/sdk/transforms/PTransform.java | 4 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 8 +-
.../apache/beam/sdk/transforms/WithKeys.java | 4 +-
.../sdk/transforms/windowing/GlobalWindow.java | 6 -
.../transforms/windowing/IntervalWindow.java | 7 -
.../org/apache/beam/sdk/values/PCollection.java | 2 +-
.../beam/sdk/values/TimestampedValue.java | 4 -
.../beam/sdk/coders/CoderFactoriesTest.java | 100 ---
.../beam/sdk/coders/CoderProvidersTest.java | 82 ++-
.../beam/sdk/coders/CoderRegistryTest.java | 167 ++---
.../beam/sdk/coders/DefaultCoderTest.java | 65 +-
.../beam/sdk/coders/IterableCoderTest.java | 17 -
.../apache/beam/sdk/coders/ListCoderTest.java | 17 -
.../apache/beam/sdk/coders/MapCoderTest.java | 20 -
.../beam/sdk/coders/SerializableCoderTest.java | 11 +
.../beam/sdk/transforms/CombineFnsTest.java | 10 +-
.../apache/beam/sdk/transforms/CreateTest.java | 6 +-
.../sdk/transforms/FlatMapElementsTest.java | 4 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 4 +-
.../beam/sdk/transforms/MapElementsTest.java | 4 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 17 +-
.../transforms/reflect/DoFnInvokersTest.java | 4 +-
.../apache/beam/sdk/values/TypedPValueTest.java | 28 +-
.../sdk/extensions/protobuf/ProtoCoder.java | 72 ++-
.../ProtobufCoderProviderRegistrar.java | 41 ++
.../protobuf/ProtobufCoderRegistrar.java | 39 --
.../sdk/extensions/protobuf/ProtoCoderTest.java | 7 +-
.../BigQueryCoderProviderRegistrar.java | 40 ++
.../io/gcp/bigquery/BigQueryCoderRegistrar.java | 39 --
.../io/gcp/bigquery/DynamicDestinations.java | 2 +-
.../pubsub/PubsubCoderProviderRegistrar.java | 37 ++
.../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java | 35 --
.../BigQueryCoderProviderRegistrarTest.java | 40 ++
.../bigquery/BigQueryCoderRegistrarTest.java | 40 --
sdks/java/io/hadoop-common/pom.xml | 7 +
.../beam/sdk/io/hadoop/WritableCoder.java | 56 ++
.../beam/sdk/io/hadoop/WritableCoderTest.java | 10 +
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +-
.../beam/sdk/transforms/DistinctJava8Test.java | 8 -
.../beam/sdk/transforms/FilterJava8Test.java | 2 +-
.../beam/sdk/transforms/PartitionJava8Test.java | 2 +-
.../beam/sdk/transforms/WithKeysJava8Test.java | 9 -
73 files changed, 1121 insertions(+), 1689 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 6fd9755..7552b94 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -409,7 +409,7 @@ public class TfIdf {
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+ pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
pipeline
.apply(new ReadDocuments(listInputDocuments(options)))
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
index d263643..3681ff5 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
@@ -48,7 +48,7 @@ public class TfIdfTest {
@Category(ValidatesRunner.class)
public void testTfIdf() throws Exception {
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+ pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
.apply(Create.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index a4d9639..6503fa2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -274,7 +274,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
input,
PCollection<T> output)
throws CannotProvideCoderException {
- // Similar logic to ParDo.MultiOutput.getDefaultOutputCoder.
+ // Similar logic to ParDo.MultiOutput.getOutputCoder.
@SuppressWarnings("unchecked")
KeyedWorkItemCoder<String, ElementAndRestriction<InputT, RestrictionT>> kwiCoder =
(KeyedWorkItemCoder) input.getCoder();
@@ -284,7 +284,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return input
.getPipeline()
.getCoderRegistry()
- .getDefaultCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
+ .getCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 46f26a1..1e60ca3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -229,7 +229,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CoderRegistry reg = pipeline.getCoderRegistry();
StateTag<CombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue("summer",
- sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
+ sumLongFn.getAccumulatorCoder(reg, reg.getCoder(Long.class)), sumLongFn);
GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(0L));
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 0fe9585..85e55eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -577,14 +577,4 @@ public class DirectRunnerTest implements Serializable {
return underlying.getDefaultOutputCoder();
}
}
-
- @Test
- public void fallbackCoderProviderAllowsInference() {
- // See https://issues.apache.org/jira/browse/BEAM-1642
- Pipeline p = getPipeline();
- p.getCoderRegistry().setFallbackCoderProvider(
- org.apache.beam.sdk.coders.AvroCoder.PROVIDER);
- p.apply(Create.of(Arrays.asList(100, 200))).apply(Count.<Integer>globally());
- p.run().waitUntilFinish();
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 528cfb0..043fe93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -33,20 +33,6 @@ import java.util.List;
*/
public abstract class AtomicCoder<T> extends StructuredCoder<T> {
/**
- * Returns an empty list.
- *
- * <p>{@link CoderFactories#fromStaticMethods(Class)} builds a {@link CoderFactory} from the
- * {@code #of()} method and this method, used to determine the components of an object. Because
- * {@link AtomicCoder} has no components, always returns an empty list.
- *
- * @param exampleValue unused, but part of the latent interface expected by {@link
- * CoderFactories#fromStaticMethods}
- */
- public static <T> List<Object> getInstanceComponents(T exampleValue) {
- return Collections.emptyList();
- }
-
- /**
* {@inheritDoc}.
*
* @throws NonDeterministicException
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 2aa2b44..f82c065 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import javax.annotation.Nullable;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
@@ -140,18 +141,39 @@ public class AvroCoder<T> extends CustomCoder<T> {
return new AvroCoder<>(type, schema);
}
- public static final CoderProvider PROVIDER = new CoderProvider() {
+ /**
+ * Returns a {@link CoderProvider} which uses the {@link AvroCoder} if possible for
+ * all types.
+ *
+ * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
+ * accept dangerous types such as {@link Object}.
+ *
+ * <p>This method is invoked reflectively from {@link DefaultCoder}.
+ */
+ @SuppressWarnings("unused")
+ public static CoderProvider getCoderProvider() {
+ return new AvroCoderProvider();
+ }
+
+ /**
+ * A {@link CoderProvider} that constructs an {@link AvroCoder} for Avro compatible classes.
+ *
+ * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
+ * accept dangerous types such as {@link Object}.
+ */
+ static class AvroCoderProvider extends CoderProvider {
@Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) {
- // This is a downcast from `? super T` to T. However, because
- // it comes from a TypeDescriptor<T>, the class object itself
- // is the same so the supertype in question shares the same
- // generated AvroCoder schema.
- @SuppressWarnings("unchecked")
- Class<T> rawType = (Class<T>) typeDescriptor.getRawType();
- return AvroCoder.of(rawType);
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ try {
+ return AvroCoder.of(typeDescriptor);
+ } catch (AvroRuntimeException e) {
+ throw new CannotProvideCoderException(
+ String.format("%s is not compatible with Avro", typeDescriptor),
+ e);
+ }
}
- };
+ }
private final Class<T> type;
private final SerializableSchemaSupplier schemaSupplier;
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index 28cb627..d83d832 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -21,7 +21,6 @@ import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
import org.apache.beam.sdk.util.StreamUtils;
@@ -45,13 +44,6 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
return INSTANCE;
}
- /**
- * Returns an empty list. {@link ByteArrayCoder} has no components.
- */
- public static <T> List<Object> getInstanceComponents(T ignored) {
- return Collections.emptyList();
- }
-
/////////////////////////////////////////////////////////////////////////////
private static final ByteArrayCoder INSTANCE = new ByteArrayCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
index c37ec00..bc2ef3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.coders;
/**
- * The exception thrown when a {@link CoderProvider} cannot
+ * The exception thrown when a {@link CoderRegistry} or {@link CoderProvider} cannot
* provide a {@link Coder} that has been requested.
*/
public class CannotProvideCoderException extends Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 41e83ac..eeafbd2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -51,9 +51,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*
* <p>{@link Coder} classes for compound types are often composed from coder classes for types
* contains therein. The composition of {@link Coder} instances into a coder for the compound
- * class is the subject of the {@link CoderFactory} type, which enables automatic generic
+ * class is the subject of the {@link CoderProvider} type, which enables automatic generic
* composition of {@link Coder} classes within the {@link CoderRegistry}. With particular
- * static methods on a compound {@link Coder} class, a {@link CoderFactory} can be automatically
+ * static methods on a compound {@link Coder} class, a {@link CoderProvider} can be automatically
* inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports
* automatic composition in the {@link CoderRegistry}.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
deleted file mode 100644
index 4f05c95..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.MoreObjects;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * Static utility methods for creating and working with {@link Coder}s.
- */
-public final class CoderFactories {
- private CoderFactories() { } // Static utility class
-
- /**
- * Creates a {@link CoderFactory} built from particular static methods of a class that
- * implements {@link Coder}.
- *
- * <p>The class must have the following static methods:
- *
- * <ul>
- * <li> {@code
- * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
- * }
- * <li> {@code
- * public static List<Object> getInstanceComponents(T exampleValue);
- * }
- * </ul>
- *
- * <p>The {@code of(...)} method will be used to construct a
- * {@code Coder<T>} from component {@link Coder}s.
- * It must accept one {@link Coder} argument for each
- * generic type parameter of {@code T}. If {@code T} takes no generic
- * type parameters, then the {@code of()} factory method should take
- * no arguments.
- *
- * <p>The {@code getInstanceComponents} method will be used to
- * decompose a value during the {@link Coder} inference process,
- * to automatically choose coders for the components.
- *
- * <p>Note that the class {@code T} to be coded may be a
- * not-yet-specialized generic class.
- * For a generic class {@code MyClass<X>} and an actual type parameter
- * {@code Foo}, the {@link CoderFactoryFromStaticMethods} will
- * accept any {@code Coder<Foo>} and produce a {@code Coder<MyClass<Foo>>}.
- *
- * <p>For example, the {@link CoderFactory} returned by
- * {@code fromStaticMethods(ListCoder.class)}
- * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}.
- */
- public static <T> CoderFactory fromStaticMethods(Class<T> clazz) {
- checkArgument(
- Coder.class.isAssignableFrom(clazz),
- "%s is not a subtype of %s",
- clazz.getName(),
- Coder.class.getSimpleName());
- return new CoderFactoryFromStaticMethods((Class<? extends Coder>) clazz);
- }
-
- /**
- * Creates a {@link CoderFactory} that always returns the
- * given coder.
- *
- * <p>The {@code getInstanceComponents} method of this
- * {@link CoderFactory} always returns an empty list.
- */
- public static <T> CoderFactory forCoder(Coder<T> coder) {
- return new CoderFactoryForCoder<>(coder);
- }
-
- /**
- * See {@link #fromStaticMethods} for a detailed description
- * of the characteristics of this {@link CoderFactory}.
- */
- private static class CoderFactoryFromStaticMethods implements CoderFactory {
-
- @Override
- @SuppressWarnings("rawtypes")
- public Coder<?> create(List<? extends Coder<?>> componentCoders) {
- try {
- return (Coder) factoryMethod.invoke(
- null /* static */, componentCoders.toArray());
- } catch (IllegalAccessException
- | IllegalArgumentException
- | InvocationTargetException
- | NullPointerException
- | ExceptionInInitializerError exn) {
- throw new IllegalStateException(
- "error when invoking Coder factory method " + factoryMethod,
- exn);
- }
- }
-
- @Override
- public List<Object> getInstanceComponents(Object value) {
- try {
- @SuppressWarnings("unchecked")
- List<Object> components = (List<Object>) getComponentsMethod.invoke(
- null /* static */, value);
- return components;
- } catch (IllegalAccessException
- | IllegalArgumentException
- | InvocationTargetException
- | NullPointerException
- | ExceptionInInitializerError exn) {
- throw new IllegalStateException(
- "error when invoking Coder getComponents method " + getComponentsMethod,
- exn);
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////
-
- // Method to create a coder given component coders
- // For a Coder class of kind * -> * -> ... n times ... -> *
- // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
- private final Method factoryMethod;
-
- // Method to decompose a value of type T into its parts.
- // For a Coder class of kind * -> * -> ... n times ... -> *
- // this has type T -> List<Object>
- // where the list has n elements.
- private final Method getComponentsMethod;
-
- /**
- * Returns a CoderFactory that invokes the given static factory method
- * to create the Coder.
- */
- private CoderFactoryFromStaticMethods(Class<? extends Coder> coderClazz) {
- this.factoryMethod = getFactoryMethod(coderClazz);
- this.getComponentsMethod = getInstanceComponentsMethod(coderClazz);
- }
-
- /**
- * Returns the static {@code of} constructor method on {@code coderClazz}
- * if it exists. It is assumed to have one {@link Coder} parameter for
- * each type parameter of {@code coderClazz}.
- */
- private Method getFactoryMethod(Class<?> coderClazz) {
- Method factoryMethodCandidate;
-
- // Find the static factory method of coderClazz named 'of' with
- // the appropriate number of type parameters.
- int numTypeParameters = coderClazz.getTypeParameters().length;
- Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
- Arrays.fill(factoryMethodArgTypes, Coder.class);
- try {
- factoryMethodCandidate =
- coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
- } catch (NoSuchMethodException | SecurityException exn) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "does not have an accessible method named 'of' with "
- + numTypeParameters + " arguments of Coder type",
- exn);
- }
- if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type is not static");
- }
- if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type does not return a " + coderClazz);
- }
- try {
- if (!factoryMethodCandidate.isAccessible()) {
- factoryMethodCandidate.setAccessible(true);
- }
- } catch (SecurityException exn) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type is not accessible",
- exn);
- }
-
- return factoryMethodCandidate;
- }
-
- /**
- * Finds the static method on {@code coderType} to use
- * to decompose a value of type {@code T} into components,
- * each corresponding to an argument of the {@code of}
- * method.
- */
- private <T, CoderT extends Coder> Method getInstanceComponentsMethod(Class<CoderT> coderClazz) {
- TypeDescriptor<CoderT> coderType = TypeDescriptor.of(coderClazz);
- TypeDescriptor<T> argumentType = getCodedType(coderType);
-
- // getInstanceComponents may be implemented in a superclass,
- // so we search them all for an applicable method. We do not
- // try to be clever about finding the best overload. It may
- // be in a generic superclass, erased to accept an Object.
- // However, subtypes are listed before supertypes (it is a
- // topological ordering) so probably the best one will be chosen
- // if there are more than one (which should be rare)
- for (TypeDescriptor<?> supertype : coderType.getClasses()) {
- for (Method method : supertype.getRawType().getDeclaredMethods()) {
- if (method.getName().equals("getInstanceComponents")) {
- TypeDescriptor<?> formalArgumentType = supertype.getArgumentTypes(method).get(0);
- if (formalArgumentType.getRawType().isAssignableFrom(argumentType.getRawType())) {
- return method;
- }
- }
- }
- }
-
- throw new IllegalArgumentException(
- "cannot create a CoderFactory from " + coderType + ": "
- + "does not have an accessible method "
- + "'getInstanceComponents'");
- }
-
- /**
- * If {@code coderType} is a subclass of {@link Coder} for a specific type {@code T}, returns
- * {@code T.class}. Otherwise, raises IllegalArgumentException.
- */
- private <T, CoderT extends Coder> TypeDescriptor<T> getCodedType(
- TypeDescriptor<CoderT> coderType) {
- TypeDescriptor<? super CoderT> coderSupertype = coderType.getSupertype(Coder.class);
- ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType();
- @SuppressWarnings("unchecked")
- TypeDescriptor<T> token =
- (TypeDescriptor<T>) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]);
- return token;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("factoryMethod", factoryMethod)
- .add("getComponentsMethod", getComponentsMethod)
- .toString();
- }
- }
-
- /**
- * See {@link #forCoder} for a detailed description of this
- * {@link CoderFactory}.
- */
- private static class CoderFactoryForCoder<T> implements CoderFactory {
- private final Coder<T> coder;
-
- public CoderFactoryForCoder(Coder<T> coder) {
- this.coder = coder;
- }
-
- @Override
- public Coder<?> create(List<? extends Coder<?>> componentCoders) {
- return this.coder;
- }
-
- @Override
- public List<Object> getInstanceComponents(Object value) {
- return Collections.emptyList();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("coder", coder)
- .toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
deleted file mode 100644
index 22d03fa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import java.util.List;
-
-/**
- * A {@link CoderFactory} creates coders and decomposes values.
- * It may operate on a parameterized type, such as {@link List},
- * in which case the {@link #create} method accepts a list of
- * coders to use for the type parameters.
- */
-public interface CoderFactory {
-
- /**
- * Returns a {@code Coder<?>}, given argument coder to use for
- * values of a particular type, given the Coders for each of
- * the type's generic parameter types.
- */
- Coder<?> create(List<? extends Coder<?>> componentCoders);
-
- /**
- * Returns a list of objects contained in {@code value}, one per
- * type argument, or {@code null} if none can be determined.
- * The list of returned objects should be the same size as the
- * list of coders required by {@link #create}.
- */
- List<Object> getInstanceComponents(Object value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
index 0db73eb..ac042ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
@@ -17,18 +17,25 @@
*/
package org.apache.beam.sdk.coders;
+import java.util.List;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
- * A {@link CoderProvider} may create a {@link Coder} for
- * any concrete class.
+ * A {@link CoderProvider} provides {@link Coder}s.
+ *
+ * <p>It may operate on a parameterized type, such as {@link List}, in which case the
+ * {@link #coderFor} method accepts a list of coders to use for the type parameters.
*/
-public interface CoderProvider {
+public abstract class CoderProvider {
/**
- * Provides a coder for a given class, if possible.
+ * Returns a {@code Coder<T>} to use for values of a particular type, given the Coders for each of
+ * the type's generic parameter types.
*
- * @throws CannotProvideCoderException if no coder can be provided
+ * <p>Throws {@link CannotProvideCoderException} if this {@link CoderProvider} cannot provide
+ * a coder for this type and components.
*/
- <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException;
+ public abstract <T> Coder<T> coderFor(
+ TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders)
+ throws CannotProvideCoderException;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
new file mode 100644
index 0000000..35d061d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import com.google.auto.service.AutoService;
+import java.util.List;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * {@link Coder} creators have the ability to automatically have their
+ * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry
+ * and a concrete implementation of this interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+@Experimental
+public interface CoderProviderRegistrar {
+ /**
+ * Returns a list of {@link CoderProvider coder providers} which
+ * will be registered by default within each {@link CoderRegistry coder registry} instance.
+ *
+ * <p>See {@link CoderProviders} for convenience methods to construct a {@link CoderProvider}.
+ */
+ List<CoderProvider> getCoderProviders();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
index c072008..414fd8b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
@@ -19,146 +19,178 @@ package org.apache.beam.sdk.coders;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.google.common.base.MoreObjects;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
- * Static utility methods for working with {@link CoderProvider CoderProviders}.
+ * Static utility methods for creating and working with {@link CoderProvider}s.
*/
public final class CoderProviders {
-
- // Static utility class
- private CoderProviders() { }
+ private CoderProviders() { } // Static utility class
/**
- * Creates a {@link CoderProvider} built from particular static methods of a class that
- * implements {@link Coder}. The requirements for this method are precisely the requirements
- * for a {@link Coder} class to be usable with {@link DefaultCoder} annotations.
- *
- * <p>The class must have the following static method:
- *
- * <pre>{@code
- * public static Coder<T> of(TypeDescriptor<T> type)
- * }
- * </pre>
+ * Creates a {@link CoderProvider} from a class's
+ * {@code static <T> Coder<T> of(TypeDescriptor<T>, List<Coder<?>>}) method.
*/
- public static <T> CoderProvider fromStaticMethods(Class<T> clazz) {
- return new CoderProviderFromStaticMethods(clazz);
+ public static CoderProvider fromStaticMethods(Class<?> rawType, Class<?> coderClazz) {
+ checkArgument(
+ Coder.class.isAssignableFrom(coderClazz),
+ "%s is not a subtype of %s",
+ coderClazz.getName(),
+ Coder.class.getSimpleName());
+ return new CoderProviderFromStaticMethods(rawType, coderClazz);
}
-
/**
- * Returns a {@link CoderProvider} that consults each of the provider {@code coderProviders}
- * and returns the first {@link Coder} provided.
- *
- * <p>Note that the order in which the providers are listed matters: While the set of types
- * handled will be the union of those handled by all of the providers in the list, the actual
- * {@link Coder} provided by the first successful provider may differ, and may have inferior
- * properties. For example, not all {@link Coder Coders} are deterministic, handle {@code null}
- * values, or have comparable performance.
+ * Creates a {@link CoderProvider} that always returns the
+ * given coder for the specified type.
*/
- public static CoderProvider firstOf(CoderProvider... coderProviders) {
- return new FirstOf(ImmutableList.copyOf(coderProviders));
+ public static CoderProvider forCoder(TypeDescriptor<?> type, Coder<?> coder) {
+ return new CoderProviderForCoder(type, coder);
}
- ///////////////////////////////////////////////////////////////////////////////////////////////
-
/**
- * @see #firstOf
+ * See {@link #fromStaticMethods} for a detailed description
+ * of the characteristics of this {@link CoderProvider}.
*/
- private static class FirstOf implements CoderProvider {
-
- private Iterable<CoderProvider> providers;
-
- public FirstOf(Iterable<CoderProvider> providers) {
- this.providers = providers;
- }
+ private static class CoderProviderFromStaticMethods extends CoderProvider {
@Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- List<String> messages = Lists.newArrayList();
- for (CoderProvider provider : providers) {
- try {
- return provider.getCoder(type);
- } catch (CannotProvideCoderException exc) {
- messages.add(String.format("%s could not provide a Coder for type %s: %s",
- provider, type, exc.getMessage()));
- }
+ public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends Coder<?>> componentCoders)
+ throws CannotProvideCoderException {
+ if (!this.rawType.equals(type.getRawType())) {
+ throw new CannotProvideCoderException(String.format(
+ "Unable to provide coder for %s, this factory can only provide coders for %s",
+ type,
+ this.rawType));
+ }
+ try {
+ return (Coder) factoryMethod.invoke(
+ null /* static */, componentCoders.toArray());
+ } catch (IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | NullPointerException
+ | ExceptionInInitializerError exn) {
+ throw new IllegalStateException(
+ "error when invoking Coder factory method " + factoryMethod,
+ exn);
}
- throw new CannotProvideCoderException(
- String.format("Cannot provide coder for type %s: %s.",
- type, Joiner.on("; ").join(messages)));
}
- }
- private static class CoderProviderFromStaticMethods implements CoderProvider {
+ ////////////////////////////////////////////////////////////////////////////////
- /** If true, then clazz has {@code of(TypeDescriptor)}. If false, {@code of(Class)}. */
- private final boolean takesTypeDescriptor;
- private final Class<?> clazz;
+ // Type raw type used to filter the incoming type on.
+ private final Class<?> rawType;
- public CoderProviderFromStaticMethods(Class<?> clazz) {
- // Note that the second condition supports older classes, which only needed to provide
- // of(Class), not of(TypeDescriptor). Our own classes have updated to accept a
- // TypeDescriptor. Hence the error message points only to the current specification,
- // not both acceptable conditions.
- checkArgument(classTakesTypeDescriptor(clazz) || classTakesClass(clazz),
- "Class " + clazz.getCanonicalName()
- + " is missing required static method of(TypeDescriptor).");
+ // Method to create a coder given component coders
+ // For a Coder class of kind * -> * -> ... n times ... -> *
+ // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
+ private final Method factoryMethod;
- this.takesTypeDescriptor = classTakesTypeDescriptor(clazz);
- this.clazz = clazz;
+ /**
+ * Returns a CoderProvider that invokes the given static factory method
+ * to create the Coder.
+ */
+ private CoderProviderFromStaticMethods(Class<?> rawType, Class<?> coderClazz) {
+ this.rawType = rawType;
+ this.factoryMethod = getFactoryMethod(coderClazz);
}
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+ /**
+ * Returns the static {@code of} constructor method on {@code coderClazz}
+ * if it exists. It is assumed to have one {@link Coder} parameter for
+ * each type parameter of {@code coderClazz}.
+ */
+ private Method getFactoryMethod(Class<?> coderClazz) {
+ Method factoryMethodCandidate;
+
+ // Find the static factory method of coderClazz named 'of' with
+ // the appropriate number of type parameters.
+ int numTypeParameters = coderClazz.getTypeParameters().length;
+ Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
+ Arrays.fill(factoryMethodArgTypes, Coder.class);
try {
- if (takesTypeDescriptor) {
- @SuppressWarnings("unchecked")
- Coder<T> result = InstanceBuilder.ofType(Coder.class)
- .fromClass(clazz)
- .fromFactoryMethod("of")
- .withArg(TypeDescriptor.class, type)
- .build();
- return result;
- } else {
- @SuppressWarnings("unchecked")
- Coder<T> result = InstanceBuilder.ofType(Coder.class)
- .fromClass(clazz)
- .fromFactoryMethod("of")
- .withArg(Class.class, type.getRawType())
- .build();
- return result;
- }
- } catch (RuntimeException exc) {
- if (exc.getCause() instanceof InvocationTargetException) {
- throw new CannotProvideCoderException(exc.getCause().getCause());
+ factoryMethodCandidate =
+ coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
+ } catch (NoSuchMethodException | SecurityException exn) {
+ throw new IllegalArgumentException(
+ "cannot register Coder " + coderClazz + ": "
+ + "does not have an accessible method named 'of' with "
+ + numTypeParameters + " arguments of Coder type",
+ exn);
+ }
+ if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
+ throw new IllegalArgumentException(
+ "cannot register Coder " + coderClazz + ": "
+ + "method named 'of' with " + numTypeParameters
+ + " arguments of Coder type is not static");
+ }
+ if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
+ throw new IllegalArgumentException(
+ "cannot register Coder " + coderClazz + ": "
+ + "method named 'of' with " + numTypeParameters
+ + " arguments of Coder type does not return a " + coderClazz);
+ }
+ try {
+ if (!factoryMethodCandidate.isAccessible()) {
+ factoryMethodCandidate.setAccessible(true);
}
- throw exc;
+ } catch (SecurityException exn) {
+ throw new IllegalArgumentException(
+ "cannot register Coder " + coderClazz + ": "
+ + "method named 'of' with " + numTypeParameters
+ + " arguments of Coder type is not accessible",
+ exn);
}
+
+ return factoryMethodCandidate;
}
- private boolean classTakesTypeDescriptor(Class<?> clazz) {
- try {
- clazz.getDeclaredMethod("of", TypeDescriptor.class);
- return true;
- } catch (NoSuchMethodException | SecurityException exc) {
- return false;
- }
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("rawType", rawType)
+ .add("factoryMethod", factoryMethod)
+ .toString();
}
+ }
- private boolean classTakesClass(Class<?> clazz) {
- try {
- clazz.getDeclaredMethod("of", Class.class);
- return true;
- } catch (NoSuchMethodException | SecurityException exc) {
- return false;
+ /**
+ * See {@link #forCoder} for a detailed description of this {@link CoderProvider}.
+ */
+ private static class CoderProviderForCoder extends CoderProvider {
+ private final Coder<?> coder;
+ private final TypeDescriptor<?> type;
+
+ public CoderProviderForCoder(TypeDescriptor<?> type, Coder<?> coder){
+ this.type = type;
+ this.coder = coder;
+ }
+
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends Coder<?>> componentCoders)
+ throws CannotProvideCoderException {
+ if (!this.type.equals(type)) {
+ throw new CannotProvideCoderException(String.format(
+ "Unable to provide coder for %s, this factory can only provide coders for %s",
+ type,
+ this.type));
}
+ return (Coder) coder;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("type", type)
+ .add("coder", coder)
+ .toString();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
deleted file mode 100644
index fced976..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * {@link Coder} creators have the ability to automatically have their
- * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry
- * and a concrete implementation of this interface.
- *
- * <p>It is optional but recommended to use one of the many build time tools such as
- * {@link AutoService} to generate the necessary META-INF files automatically.
- */
-@Experimental
-public interface CoderRegistrar {
- /**
- * Returns a mapping of {@link Class classes} to {@link CoderFactory coder factories} which
- * will be registered by default within each {@link CoderRegistry coder registry} instance.
- *
- * <p>See {@link CoderFactories} for convenience methods to construct a {@link CoderFactory}.
- *
- * <p>Note that a warning is logged if multiple {@link CoderRegistrar coder registrars} provide
- * mappings for the same {@link Class}.
- */
- Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses();
-}