You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/05 21:45:13 UTC
[1/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow
creating coders through CoderFactory for a wider range of types
Repository: beam
Updated Branches:
refs/heads/master 650598836 -> 947fa68a5
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 5107355..9568324 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -23,13 +23,11 @@ import static org.junit.Assert.assertEquals;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Type;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -74,28 +72,10 @@ public class CoderRegistryTest {
private static class NotSerializableClass { }
@Test
- public void testSerializableFallbackCoderProvider() throws Exception {
- CoderRegistry registry = CoderRegistry.createDefault();
- registry.setFallbackCoderProvider(SerializableCoder.PROVIDER);
- Coder<?> serializableCoder = registry.getDefaultCoder(SerializableClass.class);
-
- assertEquals(serializableCoder, SerializableCoder.of(SerializableClass.class));
- }
-
- @Test
- public void testAvroFallbackCoderProvider() throws Exception {
- CoderRegistry registry = CoderRegistry.createDefault();
- registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
- Coder<?> avroCoder = registry.getDefaultCoder(NotSerializableClass.class);
-
- assertEquals(avroCoder, AvroCoder.of(NotSerializableClass.class));
- }
-
- @Test
public void testRegisterInstantiatedCoder() throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
- registry.registerCoder(MyValue.class, MyValueCoder.of());
- assertEquals(registry.getDefaultCoder(MyValue.class), MyValueCoder.of());
+ registry.registerCoderForClass(MyValue.class, MyValueCoder.of());
+ assertEquals(registry.getCoder(MyValue.class), MyValueCoder.of());
}
@SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
@@ -121,17 +101,9 @@ public class CoderRegistryTest {
}
@Test
- public void testRegisterInstantiatedCoderInvalidRawtype() throws Exception {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("may not be used with unspecialized generic classes");
- CoderRegistry registry = CoderRegistry.createDefault();
- registry.registerCoder(List.class, new MyListCoder());
- }
-
- @Test
public void testSimpleDefaultCoder() throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
- assertEquals(StringUtf8Coder.of(), registry.getDefaultCoder(String.class));
+ assertEquals(StringUtf8Coder.of(), registry.getCoder(String.class));
}
@Test
@@ -139,11 +111,9 @@ public class CoderRegistryTest {
CoderRegistry registry = CoderRegistry.createDefault();
thrown.expect(CannotProvideCoderException.class);
thrown.expectMessage(allOf(
- containsString(UnknownType.class.getCanonicalName()),
- containsString("No CoderFactory has been registered"),
- containsString("does not have a @DefaultCoder annotation"),
- containsString("does not implement Serializable")));
- registry.getDefaultCoder(UnknownType.class);
+ containsString(UnknownType.class.getName()),
+ containsString("Unable to provide a Coder for")));
+ registry.getCoder(UnknownType.class);
}
@Test
@@ -151,14 +121,15 @@ public class CoderRegistryTest {
CoderRegistry registry = CoderRegistry.createDefault();
TypeDescriptor<List<Integer>> listToken = new TypeDescriptor<List<Integer>>() {};
assertEquals(ListCoder.of(VarIntCoder.of()),
- registry.getDefaultCoder(listToken));
+ registry.getCoder(listToken));
- registry.registerCoder(MyValue.class, MyValueCoder.class);
+ registry.registerCoderProvider(
+ CoderProviders.fromStaticMethods(MyValue.class, MyValueCoder.class));
TypeDescriptor<KV<String, List<MyValue>>> kvToken =
new TypeDescriptor<KV<String, List<MyValue>>>() {};
assertEquals(KvCoder.of(StringUtf8Coder.of(),
ListCoder.of(MyValueCoder.of())),
- registry.getDefaultCoder(kvToken));
+ registry.getCoder(kvToken));
}
@@ -167,7 +138,7 @@ public class CoderRegistryTest {
CoderRegistry registry = CoderRegistry.createDefault();
TypeDescriptor<Map<Integer, String>> mapToken = new TypeDescriptor<Map<Integer, String>>() {};
assertEquals(MapCoder.of(VarIntCoder.of(), StringUtf8Coder.of()),
- registry.getDefaultCoder(mapToken));
+ registry.getCoder(mapToken));
}
@Test
@@ -177,21 +148,21 @@ public class CoderRegistryTest {
new TypeDescriptor<Map<Integer, Map<String, Double>>>() {};
assertEquals(
MapCoder.of(VarIntCoder.of(), MapCoder.of(StringUtf8Coder.of(), DoubleCoder.of())),
- registry.getDefaultCoder(mapToken));
+ registry.getCoder(mapToken));
}
@Test
public void testParameterizedDefaultSetCoder() throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
TypeDescriptor<Set<Integer>> setToken = new TypeDescriptor<Set<Integer>>() {};
- assertEquals(SetCoder.of(VarIntCoder.of()), registry.getDefaultCoder(setToken));
+ assertEquals(SetCoder.of(VarIntCoder.of()), registry.getCoder(setToken));
}
@Test
public void testParameterizedDefaultNestedSetCoder() throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
TypeDescriptor<Set<Set<Integer>>> setToken = new TypeDescriptor<Set<Set<Integer>>>() {};
- assertEquals(SetCoder.of(SetCoder.of(VarIntCoder.of())), registry.getDefaultCoder(setToken));
+ assertEquals(SetCoder.of(SetCoder.of(VarIntCoder.of())), registry.getCoder(setToken));
}
@Test
@@ -201,11 +172,11 @@ public class CoderRegistryTest {
thrown.expect(CannotProvideCoderException.class);
thrown.expectMessage(String.format(
- "Cannot provide coder for parameterized type %s: Unable to provide a default Coder for %s",
+ "Cannot provide coder for parameterized type %s: Unable to provide a Coder for %s",
listUnknownToken,
- UnknownType.class.getCanonicalName()));
+ UnknownType.class.getName()));
- registry.getDefaultCoder(listUnknownToken);
+ registry.getCoder(listUnknownToken);
}
@Test
@@ -214,7 +185,7 @@ public class CoderRegistryTest {
MyGenericClass<MyValue, List<MyValue>> instance =
new MyGenericClass<MyValue, List<MyValue>>() {};
- Coder<?> bazCoder = registry.getDefaultCoder(
+ Coder<?> bazCoder = registry.getCoder(
instance.getClass(),
MyGenericClass.class,
Collections.<Type, Coder<?>>singletonMap(
@@ -230,7 +201,7 @@ public class CoderRegistryTest {
MyGenericClass<MyValue, List<MyValue>> instance =
new MyGenericClass<MyValue, List<MyValue>>() {};
- Coder<?> fooCoder = registry.getDefaultCoder(
+ Coder<?> fooCoder = registry.getCoder(
instance.getClass(),
MyGenericClass.class,
Collections.<Type, Coder<?>>singletonMap(
@@ -242,49 +213,6 @@ public class CoderRegistryTest {
}
@Test
- public void testGetDefaultCoderFromIntegerValue() throws Exception {
- CoderRegistry registry = CoderRegistry.createDefault();
- Integer i = 13;
- Coder<Integer> coder = registry.getDefaultCoder(i);
- assertEquals(VarIntCoder.of(), coder);
- }
-
- @Test
- public void testGetDefaultCoderFromNullValue() throws Exception {
- CoderRegistry registry = CoderRegistry.createDefault();
- assertEquals(VoidCoder.of(), registry.getDefaultCoder((Void) null));
- }
-
- @Test
- public void testGetDefaultCoderFromKvValue() throws Exception {
- CoderRegistry registry = CoderRegistry.createDefault();
- KV<Integer, String> kv = KV.of(13, "hello");
- Coder<KV<Integer, String>> coder = registry.getDefaultCoder(kv);
- assertEquals(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()),
- coder);
- }
-
- @Test
- public void testGetDefaultCoderFromKvNullValue() throws Exception {
- CoderRegistry registry = CoderRegistry.createDefault();
- KV<Void, Void> kv = KV.of((Void) null, (Void) null);
- assertEquals(KvCoder.of(VoidCoder.of(), VoidCoder.of()),
- registry.getDefaultCoder(kv));
- }
-
- @Test
- public void testGetDefaultCoderFromNestedKvValue() throws Exception {
- CoderRegistry registry = CoderRegistry.createDefault();
- KV<Integer, KV<Long, KV<String, String>>> kv = KV.of(13, KV.of(17L, KV.of("hello", "goodbye")));
- Coder<KV<Integer, KV<Long, KV<String, String>>>> coder = registry.getDefaultCoder(kv);
- assertEquals(
- KvCoder.of(VarIntCoder.of(),
- KvCoder.of(VarLongCoder.of(),
- KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))),
- coder);
- }
-
- @Test
public void testTypeCompatibility() throws Exception {
CoderRegistry.verifyCompatible(BigEndianIntegerCoder.of(), Integer.class);
CoderRegistry.verifyCompatible(
@@ -335,7 +263,7 @@ public class CoderRegistryTest {
public void testDefaultCoderAnnotationGenericRawtype() throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
assertEquals(
- registry.getDefaultCoder(MySerializableGeneric.class),
+ registry.getCoder(MySerializableGeneric.class),
SerializableCoder.of(MySerializableGeneric.class));
}
@@ -343,7 +271,7 @@ public class CoderRegistryTest {
public void testDefaultCoderAnnotationGeneric() throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
assertEquals(
- registry.getDefaultCoder(new TypeDescriptor<MySerializableGeneric<String>>() {}),
+ registry.getCoder(new TypeDescriptor<MySerializableGeneric<String>>() {}),
SerializableCoder.of(MySerializableGeneric.class));
}
@@ -371,11 +299,8 @@ public class CoderRegistryTest {
CoderRegistry registry = CoderRegistry.createDefault();
thrown.expect(CannotProvideCoderException.class);
- thrown.expectMessage(allOf(
- containsString("No CoderFactory has been registered"),
- containsString("does not have a @DefaultCoder annotation"),
- containsString("does not implement Serializable")));
- registry.getDefaultCoder(TypeDescriptor.of(
+ thrown.expectMessage("Unable to provide a Coder");
+ registry.getCoder(TypeDescriptor.of(
TestGenericClass.class.getTypeParameters()[0]));
}
@@ -388,8 +313,7 @@ public class CoderRegistryTest {
TypeDescriptor type = TypeDescriptor.of(
TestSerializableGenericClass.class.getTypeParameters()[0]);
- assertEquals(registry.getDefaultCoder(type),
- SerializableCoder.of(type));
+ assertEquals(SerializableCoder.of(type), registry.getCoder(type));
}
private static class TestSerializableGenericClass<TestGenericT extends Serializable> {}
@@ -450,12 +374,6 @@ public class CoderRegistryTest {
return INSTANCE;
}
- @SuppressWarnings("unused")
- public static List<Object> getInstanceComponents(
- @SuppressWarnings("unused") MyValue exampleValue) {
- return Arrays.asList();
- }
-
@Override
public void encode(MyValue value, OutputStream outStream, Context context)
throws CoderException, IOException {
@@ -498,7 +416,12 @@ public class CoderRegistryTest {
}
}
- private static class UnknownType { }
+ /**
+ * This type is incompatible with all known coder providers such as Serializable,
+ * {@code @DefaultCoder} which allows testing scenarios where coder lookup fails.
+ */
+ private static class UnknownType {
+ }
@DefaultCoder(SerializableCoder.class)
private static class MySerializableGeneric<T extends Serializable> implements Serializable {
@@ -506,20 +429,34 @@ public class CoderRegistryTest {
private T foo;
}
+ /**
+ * This type is incompatible with all known coder providers such as Serializable,
+ * {@code @DefaultCoder} which allows testing the automatic registration mechanism.
+ */
+ private static class AutoRegistrationClass {
+ }
+
+ private static class AutoRegistrationClassCoder extends CustomCoder<AutoRegistrationClass> {
+ private static final AutoRegistrationClassCoder INSTANCE = new AutoRegistrationClassCoder();
+ }
+
@Test
- public void testAutomaticRegistrationOfCoders() throws Exception {
- assertEquals(CoderRegistry.createDefault().getDefaultCoder(MyValue.class), MyValueCoder.of());
+ public void testAutomaticRegistrationOfCoderProviders() throws Exception {
+ assertEquals(AutoRegistrationClassCoder.INSTANCE,
+ CoderRegistry.createDefault().getCoder(AutoRegistrationClass.class));
}
/**
- * A {@link CoderRegistrar} to demonstrate default {@link Coder} registration.
+ * A {@link CoderProviderRegistrar} to demonstrate default {@link Coder} registration.
*/
- @AutoService(CoderRegistrar.class)
- public static class RegisteredTestCoderRegistrar implements CoderRegistrar {
+ @AutoService(CoderProviderRegistrar.class)
+ public static class RegisteredTestCoderProviderRegistrar implements CoderProviderRegistrar {
@Override
- public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
- return ImmutableMap.<Class<?>, CoderFactory>of(
- MyValue.class, CoderFactories.forCoder(MyValueCoder.of()));
+ public List<CoderProvider> getCoderProviders() {
+ return ImmutableList.of(
+ CoderProviders.forCoder(
+ TypeDescriptor.of(AutoRegistrationClass.class),
+ AutoRegistrationClassCoder.INSTANCE));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
index d335b18..aa8d94c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
@@ -18,13 +18,15 @@
package org.apache.beam.sdk.coders;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
+import org.apache.beam.sdk.coders.DefaultCoder.DefaultCoderProviderRegistrar.DefaultCoderProvider;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -32,15 +34,12 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests of Coder defaults.
+ * Tests for {@link DefaultCoder}.
*/
@RunWith(JUnit4.class)
public class DefaultCoderTest {
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- public CoderRegistry registry = CoderRegistry.createDefault();
+ @Rule public ExpectedException thrown = ExpectedException.none();
@DefaultCoder(AvroCoder.class)
private static class AvroRecord {
@@ -75,6 +74,17 @@ public class DefaultCoderTest {
protected CustomSerializableCoder() {
super(CustomRecord.class, TypeDescriptor.of(CustomRecord.class));
}
+
+ @SuppressWarnings("unused")
+ public static CoderProvider getCoderProvider() {
+ return new CoderProvider() {
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ return CustomSerializableCoder.of((TypeDescriptor) typeDescriptor);
+ }
+ };
+ }
}
private static class OldCustomSerializableCoder extends SerializableCoder<OldCustomRecord> {
@@ -89,33 +99,52 @@ public class DefaultCoderTest {
protected OldCustomSerializableCoder() {
super(OldCustomRecord.class, TypeDescriptor.of(OldCustomRecord.class));
}
+
+ @SuppressWarnings("unused")
+ public static CoderProvider getCoderProvider() {
+ return new CoderProvider() {
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ return OldCustomSerializableCoder.of((Class) typeDescriptor.getRawType());
+ }
+ };
+ }
}
@Test
- public void testDefaultCoderClasses() throws Exception {
- assertThat(registry.getDefaultCoder(AvroRecord.class), instanceOf(AvroCoder.class));
- assertThat(registry.getDefaultCoder(SerializableBase.class),
- instanceOf(SerializableCoder.class));
- assertThat(registry.getDefaultCoder(SerializableRecord.class),
+ public void testCodersWithoutComponents() throws Exception {
+ CoderRegistry registry = CoderRegistry.createDefault();
+ registry.registerCoderProvider(
+ new DefaultCoderProvider());
+ assertThat(registry.getCoder(AvroRecord.class),
+ instanceOf(AvroCoder.class));
+ assertThat(registry.getCoder(SerializableRecord.class),
instanceOf(SerializableCoder.class));
- assertThat(registry.getDefaultCoder(CustomRecord.class),
+ assertThat(registry.getCoder(CustomRecord.class),
instanceOf(CustomSerializableCoder.class));
- assertThat(registry.getDefaultCoder(OldCustomRecord.class),
+ assertThat(registry.getCoder(OldCustomRecord.class),
instanceOf(OldCustomSerializableCoder.class));
}
@Test
public void testDefaultCoderInCollection() throws Exception {
- assertThat(registry.getDefaultCoder(new TypeDescriptor<List<AvroRecord>>(){}),
- instanceOf(ListCoder.class));
- assertThat(registry.getDefaultCoder(new TypeDescriptor<List<SerializableRecord>>(){}),
- equalTo((Coder<List<SerializableRecord>>)
+ CoderRegistry registry = CoderRegistry.createDefault();
+ registry.registerCoderProvider(
+ new DefaultCoderProvider());
+ Coder<List<AvroRecord>> avroRecordCoder =
+ registry.getCoder(new TypeDescriptor<List<AvroRecord>>(){});
+ assertThat(avroRecordCoder, instanceOf(ListCoder.class));
+ assertThat(((ListCoder) avroRecordCoder).getElemCoder(), instanceOf(AvroCoder.class));
+ assertThat(registry.getCoder(new TypeDescriptor<List<SerializableRecord>>(){}),
+ Matchers.<Coder<List<SerializableRecord>>>equalTo(
ListCoder.of(SerializableCoder.of(SerializableRecord.class))));
}
@Test
public void testUnknown() throws Exception {
thrown.expect(CannotProvideCoderException.class);
- registry.getDefaultCoder(Unknown.class);
+ new DefaultCoderProvider().coderFor(
+ TypeDescriptor.of(Unknown.class), Collections.<Coder<?>>emptyList());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index 1e56135..1141fb5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.coders;
import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import java.util.Arrays;
@@ -62,21 +60,6 @@ public class IterableCoderTest {
}
@Test
- public void testGetInstanceComponentsNonempty() {
- Iterable<Integer> iterable = Arrays.asList(2, 58, 99, 5);
- List<Object> components = IterableCoder.getInstanceComponents(iterable);
- assertEquals(1, components.size());
- assertEquals(2, components.get(0));
- }
-
- @Test
- public void testGetInstanceComponentsEmpty() {
- Iterable<Integer> iterable = Arrays.asList();
- List<Object> components = IterableCoder.getInstanceComponents(iterable);
- assertNull(components);
- }
-
- @Test
public void testCoderSerializable() throws Exception {
CoderProperties.coderSerializable(TEST_CODER);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
index 35239d6..bcc4400 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.coders;
import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import java.util.Arrays;
@@ -62,21 +60,6 @@ public class ListCoderTest {
}
@Test
- public void testGetInstanceComponentsNonempty() throws Exception {
- List<Integer> list = Arrays.asList(21, 5, 3, 5);
- List<Object> components = ListCoder.getInstanceComponents(list);
- assertEquals(1, components.size());
- assertEquals(21, components.get(0));
- }
-
- @Test
- public void testGetInstanceComponentsEmpty() throws Exception {
- List<Integer> list = Arrays.asList();
- List<Object> components = ListCoder.getInstanceComponents(list);
- assertNull(components);
- }
-
- @Test
public void testEmptyList() throws Exception {
List<Integer> list = Collections.emptyList();
Coder<List<Integer>> coder = ListCoder.of(VarIntCoder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
index a52e6cb..19c895e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
@@ -18,14 +18,11 @@
package org.apache.beam.sdk.coders;
import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -64,23 +61,6 @@ public class MapCoderTest {
MapCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
}
- @Test
- public void testGetInstanceComponentsNonempty() {
- Map<Integer, String> map = new HashMap<>();
- map.put(17, "foozle");
- List<Object> components = MapCoder.getInstanceComponents(map);
- assertEquals(2, components.size());
- assertEquals(17, components.get(0));
- assertEquals("foozle", components.get(1));
- }
-
- @Test
- public void testGetInstanceComponentsEmpty() {
- Map<Integer, String> map = new HashMap<>();
- List<Object> components = MapCoder.getInstanceComponents(map);
- assertNull(components);
- }
-
/**
* Generated data to check that the wire format has not changed. To regenerate, see
* {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index ef6df34..d97eea6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.coders;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -230,4 +231,14 @@ public class SerializableCoderTest implements Serializable {
SerializableCoder.of(MyRecord.class).getEncodedTypeDescriptor(),
Matchers.equalTo(TypeDescriptor.of(MyRecord.class)));
}
+
+ private static class AutoRegistration implements Serializable {
+ private static final long serialVersionUID = 42L;
+ }
+
+ @Test
+ public void testSerializableCoderProviderIsRegistered() throws Exception {
+ assertThat(CoderRegistry.createDefault().getCoder(AutoRegistration.class),
+ instanceOf(SerializableCoder.class));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 62edac9..8a4d563 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -104,7 +104,7 @@ public class CombineFnsTest {
@Test
@Category(ValidatesRunner.class)
public void testComposedCombine() {
- p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
+ p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
PCollection<KV<String, KV<Integer, UserString>>> perKeyInput = p.apply(
Create.timestamped(
@@ -156,7 +156,7 @@ public class CombineFnsTest {
@Test
@Category(ValidatesRunner.class)
public void testComposedCombineWithContext() {
- p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
+ p.getCoderRegistry().registerCoderForClass(UserString.class, UserStringCoder.of());
PCollectionView<String> view = p
.apply(Create.of("I"))
@@ -218,8 +218,10 @@ public class CombineFnsTest {
@Test
@Category(ValidatesRunner.class)
public void testComposedCombineNullValues() {
- p.getCoderRegistry().registerCoder(UserString.class, NullableCoder.of(UserStringCoder.of()));
- p.getCoderRegistry().registerCoder(String.class, NullableCoder.of(StringUtf8Coder.of()));
+ p.getCoderRegistry().registerCoderForClass(
+ UserString.class, NullableCoder.of(UserStringCoder.of()));
+ p.getCoderRegistry().registerCoderForClass(
+ String.class, NullableCoder.of(StringUtf8Coder.of()));
PCollection<KV<String, KV<Integer, UserString>>> perKeyInput = p.apply(
Create.timestamped(
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 76f61b3..a458812 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -321,7 +321,7 @@ public class CreateTest {
@Test
public void testCreateTimestampedDefaultOutputCoderUsingTypeDescriptor() throws Exception {
Coder<Record> coder = new RecordCoder();
- p.getCoderRegistry().registerCoder(Record.class, coder);
+ p.getCoderRegistry().registerCoderForClass(Record.class, coder);
PBegin pBegin = PBegin.in(p);
Create.TimestampedValues<Record> values =
Create.timestamped(
@@ -364,7 +364,7 @@ public class CreateTest {
@Test
public void testCreateDefaultOutputCoderUsingInference() throws Exception {
Coder<Record> coder = new RecordCoder();
- p.getCoderRegistry().registerCoder(Record.class, coder);
+ p.getCoderRegistry().registerCoderForClass(Record.class, coder);
PBegin pBegin = PBegin.in(p);
Create.Values<Record> values = Create.of(new Record(), new Record(), new Record());
Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
@@ -384,7 +384,7 @@ public class CreateTest {
@Test
public void testCreateDefaultOutputCoderUsingTypeDescriptor() throws Exception {
Coder<Record> coder = new RecordCoder();
- p.getCoderRegistry().registerCoder(Record.class, coder);
+ p.getCoderRegistry().registerCoderForClass(Record.class, coder);
PBegin pBegin = PBegin.in(p);
Create.Values<Record> values =
Create.of(new Record(), new Record2()).withType(new TypeDescriptor<Record>() {});
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index b24071e..11f284f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -94,8 +94,8 @@ public class FlatMapElementsTest implements Serializable {
assertThat(output.getTypeDescriptor(),
equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {}));
- assertThat(pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()),
- equalTo(pipeline.getCoderRegistry().getDefaultCoder(new TypeDescriptor<String>() {})));
+ assertThat(pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()),
+ equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {})));
// Make sure the pipeline runs
pipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 6982e01..aba33eb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -392,7 +393,8 @@ public class GroupByKeyTest {
final int numValues = 10;
final int numKeys = 5;
- p.getCoderRegistry().registerCoder(BadEqualityKey.class, DeterministicKeyCoder.class);
+ p.getCoderRegistry().registerCoderProvider(
+ CoderProviders.fromStaticMethods(BadEqualityKey.class, DeterministicKeyCoder.class));
// construct input data
List<KV<BadEqualityKey, Long>> input = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 7bf94a0..241b60e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -187,8 +187,8 @@ public class MapElementsTest implements Serializable {
}));
assertThat(output.getTypeDescriptor(),
equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {}));
- assertThat(pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()),
- equalTo(pipeline.getCoderRegistry().getDefaultCoder(new TypeDescriptor<String>() {})));
+ assertThat(pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()),
+ equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {})));
// Make sure the pipeline runs too
pipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 073957f..cbbe7f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -985,11 +985,6 @@ public class ParDoTest implements Serializable {
return INSTANCE;
}
- @SuppressWarnings("unused") // used to create a CoderFactory
- public static List<Object> getInstanceComponents(TestDummy exampleValue) {
- return Collections.emptyList();
- }
-
@Override
public void encode(TestDummy value, OutputStream outStream, Context context)
throws CoderException, IOException {
@@ -1657,7 +1652,7 @@ public class ParDoTest implements Serializable {
public void testValueStateCoderInference() {
final String stateId = "foo";
MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
- pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+ pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
DoFn<KV<String, Integer>, MyInteger> fn =
new DoFn<KV<String, Integer>, MyInteger>() {
@@ -1750,7 +1745,7 @@ public class ParDoTest implements Serializable {
public void testCoderInferenceOfList() {
final String stateId = "foo";
MyIntegerCoder myIntegerCoder = MyIntegerCoder.of();
- pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+ pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
DoFn<KV<String, Integer>, List<MyInteger>> fn =
new DoFn<KV<String, Integer>, List<MyInteger>>() {
@@ -1966,7 +1961,7 @@ public class ParDoTest implements Serializable {
public void testBagStateCoderInference() {
final String stateId = "foo";
Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
- pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+ pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
DoFn<KV<String, Integer>, List<MyInteger>> fn =
new DoFn<KV<String, Integer>, List<MyInteger>>() {
@@ -2085,7 +2080,7 @@ public class ParDoTest implements Serializable {
final String stateId = "foo";
final String countStateId = "count";
Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
- pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+ pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
DoFn<KV<String, Integer>, Set<MyInteger>> fn =
new DoFn<KV<String, Integer>, Set<MyInteger>>() {
@@ -2217,7 +2212,7 @@ public class ParDoTest implements Serializable {
final String stateId = "foo";
final String countStateId = "count";
Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
- pipeline.getCoderRegistry().registerCoder(MyInteger.class, myIntegerCoder);
+ pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn =
new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
@@ -2345,7 +2340,7 @@ public class ParDoTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesStatefulParDo.class})
public void testCombiningStateCoderInference() {
- pipeline.getCoderRegistry().registerCoder(MyInteger.class, MyIntegerCoder.of());
+ pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, MyIntegerCoder.of());
final String stateId = "foo";
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index f26dd59..489493a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.StateSpec;
@@ -422,7 +423,8 @@ public class DoFnInvokersTest {
DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
CoderRegistry coderRegistry = CoderRegistry.createDefault();
- coderRegistry.registerCoder(RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class);
+ coderRegistry.registerCoderProvider(CoderProviders.fromStaticMethods(
+ RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class));
assertThat(
invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
instanceOf(CoderForDefaultTracker.class));
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index f752b1c..e1eb2fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -77,14 +78,10 @@ public class TypedPValueTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("No Coder has been manually specified");
- thrown.expectMessage(
- containsString("Building a Coder using a registered CoderFactory failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the @DefaultCoder annotation failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the fallback CoderProvider failed"));
+ thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
- tuple.get(untypedOutputTag).getCoder();
+ Coder<?> coder = tuple.get(untypedOutputTag).getCoder();
+ System.out.println(coder);
}
@Test
@@ -96,12 +93,7 @@ public class TypedPValueTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("No Coder has been manually specified");
- thrown.expectMessage(
- containsString("Building a Coder using a registered CoderFactory failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the @DefaultCoder annotation failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the fallback CoderProvider failed"));
+ thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
tuple.get(untypedOutputTag).getCoder();
}
@@ -126,7 +118,10 @@ public class TypedPValueTest {
assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class));
}
- // A simple class for which there should be no obvious Coder.
+ /**
+ * This type is incompatible with all known coder providers such as Serializable,
+ * {@code @DefaultCoder} which allows testing coder registry lookup failure cases.
+ */
static class EmptyClass {
}
@@ -149,10 +144,7 @@ public class TypedPValueTest {
thrown.expectMessage(not(containsString("erasure")));
thrown.expectMessage(not(containsString("see TupleTag Javadoc")));
// Instead, expect output suggesting other possible fixes.
- thrown.expectMessage(containsString("Building a Coder using a registered CoderFactory failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the @DefaultCoder annotation failed"));
- thrown.expectMessage(containsString("Building a Coder from the fallback CoderProvider failed"));
+ thrown.expectMessage("Building a Coder using a registered CoderProvider failed");
input.getCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 9577c6e..968a2fa 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -31,6 +31,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
@@ -41,6 +42,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderProvider;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -105,13 +107,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*/
public class ProtoCoder<T extends Message> extends CustomCoder<T> {
- /**
- * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
- * {@link ExtensionRegistry}.
- */
- public static CoderProvider coderProvider() {
- return PROVIDER;
- }
/**
* Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
@@ -291,36 +286,47 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
return memoizedParser;
}
- static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {};
+ /**
+ * Returns a {@link CoderProvider} which uses the {@link ProtoCoder} for
+ * {@link Message proto messages}.
+ *
+ * <p>This method is invoked reflectively from {@link DefaultCoder}.
+ */
+ public static CoderProvider getCoderProvider() {
+ return new ProtoCoderProvider();
+ }
+
+ static final TypeDescriptor<Message> MESSAGE_TYPE = new TypeDescriptor<Message>() {};
/**
- * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by
- * {@link #coderProvider()}.
+ * A {@link CoderProvider} for {@link Message proto messages}.
*/
- private static final CoderProvider PROVIDER =
- new CoderProvider() {
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- if (!type.isSubtypeOf(CHECK)) {
- throw new CannotProvideCoderException(
- String.format(
- "Cannot provide %s because %s is not a subclass of %s",
- ProtoCoder.class.getSimpleName(),
- type,
- Message.class.getName()));
- }
+ private static class ProtoCoderProvider extends CoderProvider {
- @SuppressWarnings("unchecked")
- TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type;
- try {
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
- return coder;
- } catch (IllegalArgumentException e) {
- throw new CannotProvideCoderException(e);
- }
- }
- };
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ if (!typeDescriptor.isSubtypeOf(MESSAGE_TYPE)) {
+ throw new CannotProvideCoderException(
+ String.format(
+ "Cannot provide %s because %s is not a subclass of %s",
+ ProtoCoder.class.getSimpleName(),
+ typeDescriptor,
+ Message.class.getName()));
+ }
+
+ @SuppressWarnings("unchecked")
+ TypeDescriptor<? extends Message> messageType =
+ (TypeDescriptor<? extends Message>) typeDescriptor;
+ try {
+ @SuppressWarnings("unchecked")
+ Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
+ return coder;
+ } catch (IllegalArgumentException e) {
+ throw new CannotProvideCoderException(e);
+ }
+ }
+ }
private SortedSet<String> getSortedExtensionClasses() {
SortedSet<String> ret = new TreeSet<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
new file mode 100644
index 0000000..1cb79d7
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderProviderRegistrar.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with Google Protobuf.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class ProtobufCoderProviderRegistrar implements CoderProviderRegistrar {
+ @Override
+ public List<CoderProvider> getCoderProviders() {
+ return ImmutableList.of(
+ CoderProviders.forCoder(TypeDescriptor.of(ByteString.class), ByteStringCoder.of()),
+ ProtoCoder.getCoderProvider());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
deleted file mode 100644
index fb39a14..0000000
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufCoderRegistrar.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.protobuf;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/**
- * A {@link CoderRegistrar} for standard types used with Google Protobuf.
- */
-@AutoService(CoderRegistrar.class)
-public class ProtobufCoderRegistrar implements CoderRegistrar {
- @Override
- public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
- return ImmutableMap.<Class<?>, CoderFactory>of(
- ByteString.class, CoderFactories.forCoder(ByteStringCoder.of()));
- }
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
index 6d00b86..d79bf4c 100644
--- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
+++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import com.google.common.collect.ImmutableList;
+import java.util.Collections;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
@@ -53,7 +54,8 @@ public class ProtoCoderTest {
assertEquals(
ProtoCoder.of(new TypeDescriptor<MessageA>() {}),
- ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() {}));
+ ProtoCoder.getCoderProvider().coderFor(
+ new TypeDescriptor<MessageA>() {}, Collections.<Coder<?>>emptyList()));
}
@Test
@@ -61,7 +63,8 @@ public class ProtoCoderTest {
thrown.expect(CannotProvideCoderException.class);
thrown.expectMessage("java.lang.Integer is not a subclass of com.google.protobuf.Message");
- ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {});
+ ProtoCoder.getCoderProvider().coderFor(
+ new TypeDescriptor<Integer>() {}, Collections.<Coder<?>>emptyList());
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
new file mode 100644
index 0000000..c6fd849
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrar.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link CoderProviderRegistrar} for standard types used with {@link BigQueryIO}.
+ */
+@AutoService(CoderProviderRegistrar.class)
+public class BigQueryCoderProviderRegistrar implements CoderProviderRegistrar {
+ @Override
+ public List<CoderProvider> getCoderProviders() {
+ return ImmutableList.of(
+ CoderProviders.forCoder(TypeDescriptor.of(TableRow.class), TableRowJsonCoder.of()),
+ CoderProviders.forCoder(TypeDescriptor.of(TableRowInfo.class), TableRowInfoCoder.of()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
deleted file mode 100644
index 847c7b5..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrar.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/**
- * A {@link CoderRegistrar} for standard types used with {@link BigQueryIO}.
- */
-@AutoService(CoderRegistrar.class)
-public class BigQueryCoderRegistrar implements CoderRegistrar {
- @Override
- public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
- return ImmutableMap.of(
- TableRow.class, CoderFactories.forCoder(TableRowJsonCoder.of()),
- TableRowInfo.class, CoderFactories.forCoder(TableRowInfoCoder.of()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index dc8bcff..edb1e0d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -168,7 +168,7 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
Type parameter = parameterized.getActualTypeArguments()[1];
@SuppressWarnings("unchecked")
Class<DestinationT> parameterClass = (Class<DestinationT>) parameter;
- return registry.getDefaultCoder(parameterClass);
+ return registry.getCoder(parameterClass);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
new file mode 100644
index 0000000..57bc903
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link CoderProviderRegistrar} for standard types used with {@link PubsubIO}. */
+@AutoService(CoderProviderRegistrar.class)
+public class PubsubCoderProviderRegistrar implements CoderProviderRegistrar {
+ @Override
+ public List<CoderProvider> getCoderProviders() {
+ return ImmutableList.of(
+ CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class),
+ PubsubMessageWithAttributesCoder.of()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
deleted file mode 100644
index 062f350..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.CoderFactories;
-import org.apache.beam.sdk.coders.CoderFactory;
-import org.apache.beam.sdk.coders.CoderRegistrar;
-
-/** A {@link CoderRegistrar} for standard types used with {@link PubsubIO}. */
-@AutoService(CoderRegistrar.class)
-public class PubsubCoderRegistrar implements CoderRegistrar {
- @Override
- public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
- return ImmutableMap.<Class<?>, CoderFactory>of(
- PubsubMessage.class, CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
new file mode 100644
index 0000000..bd5246a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderProviderRegistrarTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link BigQueryCoderProviderRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryCoderProviderRegistrarTest {
+ @Test
+ public void testTableRowCoderIsRegistered() throws Exception {
+ CoderRegistry.createDefault().getCoder(TableRow.class);
+ }
+
+ @Test
+ public void testTableRowInfoCoderIsRegistered() throws Exception {
+ CoderRegistry.createDefault().getCoder(TableRowInfo.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
deleted file mode 100644
index e7e9fe1..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryCoderRegistrarTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableRow;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link BigQueryCoderRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryCoderRegistrarTest {
- @Test
- public void testTableRowCoderIsRegistered() throws Exception {
- CoderRegistry.createDefault().getDefaultCoder(TableRow.class);
- }
-
- @Test
- public void testTableRowInfoCoderIsRegistered() throws Exception {
- CoderRegistry.createDefault().getDefaultCoder(TableRowInfo.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml
index ebb4b39..9054f99 100644
--- a/sdks/java/io/hadoop-common/pom.xml
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -77,5 +77,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <!-- build dependencies -->
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 15877b0..8fddfe0 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.hadoop;
+import com.google.auto.service.AutoService;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -25,9 +26,14 @@ import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -110,4 +116,54 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> {
return type.hashCode();
}
+ /**
+ * Returns a {@link CoderProvider} which uses the {@link WritableCoder} for Hadoop
+ * {@link Writable writable types}.
+ *
+ * <p>This method is invoked reflectively from {@link DefaultCoder}.
+ */
+ public static CoderProvider getCoderProvider() {
+ return new WritableCoderProvider();
+ }
+
+ /**
+ * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
+ * {@link Writable writable types}.
+ */
+ @AutoService(CoderProviderRegistrar.class)
+ public static class WritableCoderProviderRegistrar implements CoderProviderRegistrar {
+
+ @Override
+ public List<CoderProvider> getCoderProviders() {
+ return Collections.singletonList(getCoderProvider());
+ }
+ }
+
+ /**
+ * A {@link CoderProvider} for Hadoop {@link Writable writable types}.
+ */
+ private static class WritableCoderProvider extends CoderProvider {
+ private static final TypeDescriptor<Writable> WRITABLE_TYPE = new TypeDescriptor<Writable>() {};
+
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ if (!typeDescriptor.isSubtypeOf(WRITABLE_TYPE)) {
+ throw new CannotProvideCoderException(
+ String.format(
+ "Cannot provide %s because %s does not implement the interface %s",
+ WritableCoder.class.getSimpleName(),
+ typeDescriptor,
+ Writable.class.getName()));
+ }
+
+ try {
+ @SuppressWarnings("unchecked")
+ Coder<T> coder = WritableCoder.of((Class) typeDescriptor.getRawType());
+ return coder;
+ } catch (IllegalArgumentException e) {
+ throw new CannotProvideCoderException(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
index 8127773..9f2a54d 100644
--- a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
+++ b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java
@@ -17,6 +17,10 @@
*/
package org.apache.beam.sdk.io.hadoop;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
@@ -42,4 +46,10 @@ public class WritableCoderTest {
CoderProperties.coderDecodeEncodeEqual(coder, value);
}
+
+ @Test
+ public void testAutomaticRegistrationOfCoderProvider() throws Exception {
+ assertThat(CoderRegistry.createDefault().getCoder(NullWritable.class),
+ instanceOf(WritableCoder.class));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e676455..ba84c2a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -304,7 +304,7 @@ public class KafkaIO {
Class<T> clazz = (Class<T>) parameter;
try {
- return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
+ return NullableCoder.of(coderRegistry.getCoder(clazz));
} catch (CannotProvideCoderException e) {
throw new RuntimeException(
String.format("Unable to automatically infer a Coder for "
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
index 790f51e..61df23b 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.transforms;
import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
@@ -89,13 +88,6 @@ public class DistinctJava8Test {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes");
- thrown.expectMessage("No Coder has been manually specified");
- thrown.expectMessage(
- containsString("Building a Coder using a registered CoderFactory failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the @DefaultCoder annotation failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the fallback CoderProvider failed"));
// Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is
// implemented with
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
index a3481e1..b38250e 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
@@ -91,7 +91,7 @@ public class FilterJava8Test implements Serializable {
.apply(Filter.by(s -> true));
thrown.expect(CannotProvideCoderException.class);
- pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor());
+ pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor());
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
index 7d97740..94353a5 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/PartitionJava8Test.java
@@ -69,6 +69,6 @@ public class PartitionJava8Test implements Serializable {
.apply(Partition.of(1, (element, numPartitions) -> 0));
thrown.expect(CannotProvideCoderException.class);
- pipeline.getCoderRegistry().getDefaultCoder(output.get(0).getTypeDescriptor());
+ pipeline.getCoderRegistry().getCoder(output.get(0).getTypeDescriptor());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index 4f0361e..260dd24 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import static org.hamcrest.Matchers.containsString;
-
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -69,13 +67,6 @@ public class WithKeysJava8Test {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys");
- thrown.expectMessage("No Coder has been manually specified");
- thrown.expectMessage(
- containsString("Building a Coder using a registered CoderFactory failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the @DefaultCoder annotation failed"));
- thrown.expectMessage(
- containsString("Building a Coder from the fallback CoderProvider failed"));
p.run();
}
[4/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow
creating coders through CoderFactory for a wider range of types
Posted by lc...@apache.org.
[BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types
This closes #2910
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/947fa68a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/947fa68a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/947fa68a
Branch: refs/heads/master
Commit: 947fa68a53068a2eb0ce9955685f7a46cc543192
Parents: 6505988 f8e2cf8
Author: Luke Cwik <lc...@google.com>
Authored: Fri May 5 14:45:03 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri May 5 14:45:03 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/complete/TfIdf.java | 2 +-
.../beam/examples/complete/TfIdfTest.java | 2 +-
.../beam/runners/core/SplittableParDo.java | 4 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 2 +-
.../beam/runners/direct/DirectRunnerTest.java | 10 -
.../org/apache/beam/sdk/coders/AtomicCoder.java | 14 -
.../org/apache/beam/sdk/coders/AvroCoder.java | 42 +-
.../apache/beam/sdk/coders/ByteArrayCoder.java | 8 -
.../sdk/coders/CannotProvideCoderException.java | 2 +-
.../java/org/apache/beam/sdk/coders/Coder.java | 4 +-
.../apache/beam/sdk/coders/CoderFactories.java | 292 ---------
.../apache/beam/sdk/coders/CoderFactory.java | 44 --
.../apache/beam/sdk/coders/CoderProvider.java | 19 +-
.../beam/sdk/coders/CoderProviderRegistrar.java | 42 ++
.../apache/beam/sdk/coders/CoderProviders.java | 240 +++----
.../apache/beam/sdk/coders/CoderRegistrar.java | 45 --
.../apache/beam/sdk/coders/CoderRegistry.java | 618 +++++++------------
.../apache/beam/sdk/coders/CollectionCoder.java | 9 -
.../org/apache/beam/sdk/coders/CustomCoder.java | 8 -
.../apache/beam/sdk/coders/DefaultCoder.java | 119 +++-
.../apache/beam/sdk/coders/IterableCoder.java | 9 -
.../beam/sdk/coders/IterableLikeCoder.java | 12 -
.../org/apache/beam/sdk/coders/KvCoder.java | 7 -
.../org/apache/beam/sdk/coders/ListCoder.java | 8 -
.../org/apache/beam/sdk/coders/MapCoder.java | 12 -
.../beam/sdk/coders/SerializableCoder.java | 57 +-
.../org/apache/beam/sdk/coders/SetCoder.java | 9 -
.../apache/beam/sdk/coders/VarLongCoder.java | 7 -
.../beam/sdk/transforms/CombineFnBase.java | 4 +-
.../apache/beam/sdk/transforms/CombineFns.java | 4 +-
.../org/apache/beam/sdk/transforms/Create.java | 76 ++-
.../apache/beam/sdk/transforms/PTransform.java | 4 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 8 +-
.../apache/beam/sdk/transforms/WithKeys.java | 4 +-
.../sdk/transforms/windowing/GlobalWindow.java | 6 -
.../transforms/windowing/IntervalWindow.java | 7 -
.../org/apache/beam/sdk/values/PCollection.java | 2 +-
.../beam/sdk/values/TimestampedValue.java | 4 -
.../beam/sdk/coders/CoderFactoriesTest.java | 100 ---
.../beam/sdk/coders/CoderProvidersTest.java | 82 ++-
.../beam/sdk/coders/CoderRegistryTest.java | 167 ++---
.../beam/sdk/coders/DefaultCoderTest.java | 65 +-
.../beam/sdk/coders/IterableCoderTest.java | 17 -
.../apache/beam/sdk/coders/ListCoderTest.java | 17 -
.../apache/beam/sdk/coders/MapCoderTest.java | 20 -
.../beam/sdk/coders/SerializableCoderTest.java | 11 +
.../beam/sdk/transforms/CombineFnsTest.java | 10 +-
.../apache/beam/sdk/transforms/CreateTest.java | 6 +-
.../sdk/transforms/FlatMapElementsTest.java | 4 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 4 +-
.../beam/sdk/transforms/MapElementsTest.java | 4 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 17 +-
.../transforms/reflect/DoFnInvokersTest.java | 4 +-
.../apache/beam/sdk/values/TypedPValueTest.java | 28 +-
.../sdk/extensions/protobuf/ProtoCoder.java | 72 ++-
.../ProtobufCoderProviderRegistrar.java | 41 ++
.../protobuf/ProtobufCoderRegistrar.java | 39 --
.../sdk/extensions/protobuf/ProtoCoderTest.java | 7 +-
.../BigQueryCoderProviderRegistrar.java | 40 ++
.../io/gcp/bigquery/BigQueryCoderRegistrar.java | 39 --
.../io/gcp/bigquery/DynamicDestinations.java | 2 +-
.../pubsub/PubsubCoderProviderRegistrar.java | 37 ++
.../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java | 35 --
.../BigQueryCoderProviderRegistrarTest.java | 40 ++
.../bigquery/BigQueryCoderRegistrarTest.java | 40 --
sdks/java/io/hadoop-common/pom.xml | 7 +
.../beam/sdk/io/hadoop/WritableCoder.java | 56 ++
.../beam/sdk/io/hadoop/WritableCoderTest.java | 10 +
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +-
.../beam/sdk/transforms/DistinctJava8Test.java | 8 -
.../beam/sdk/transforms/FilterJava8Test.java | 2 +-
.../beam/sdk/transforms/PartitionJava8Test.java | 2 +-
.../beam/sdk/transforms/WithKeysJava8Test.java | 9 -
73 files changed, 1121 insertions(+), 1689 deletions(-)
----------------------------------------------------------------------
[3/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow
creating coders through CoderFactory for a wider range of types
Posted by lc...@apache.org.
[BEAM-2174] Update CoderRegistry to allow creating coders through CoderFactory for a wider range of types
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8e2cf89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8e2cf89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8e2cf89
Branch: refs/heads/master
Commit: f8e2cf89febeb2f86d2dc91c8d3fff5d43df3623
Parents: 6505988
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 19:55:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri May 5 14:44:32 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/complete/TfIdf.java | 2 +-
.../beam/examples/complete/TfIdfTest.java | 2 +-
.../beam/runners/core/SplittableParDo.java | 4 +-
.../CopyOnAccessInMemoryStateInternalsTest.java | 2 +-
.../beam/runners/direct/DirectRunnerTest.java | 10 -
.../org/apache/beam/sdk/coders/AtomicCoder.java | 14 -
.../org/apache/beam/sdk/coders/AvroCoder.java | 42 +-
.../apache/beam/sdk/coders/ByteArrayCoder.java | 8 -
.../sdk/coders/CannotProvideCoderException.java | 2 +-
.../java/org/apache/beam/sdk/coders/Coder.java | 4 +-
.../apache/beam/sdk/coders/CoderFactories.java | 292 ---------
.../apache/beam/sdk/coders/CoderFactory.java | 44 --
.../apache/beam/sdk/coders/CoderProvider.java | 19 +-
.../beam/sdk/coders/CoderProviderRegistrar.java | 42 ++
.../apache/beam/sdk/coders/CoderProviders.java | 240 +++----
.../apache/beam/sdk/coders/CoderRegistrar.java | 45 --
.../apache/beam/sdk/coders/CoderRegistry.java | 618 +++++++------------
.../apache/beam/sdk/coders/CollectionCoder.java | 9 -
.../org/apache/beam/sdk/coders/CustomCoder.java | 8 -
.../apache/beam/sdk/coders/DefaultCoder.java | 119 +++-
.../apache/beam/sdk/coders/IterableCoder.java | 9 -
.../beam/sdk/coders/IterableLikeCoder.java | 12 -
.../org/apache/beam/sdk/coders/KvCoder.java | 7 -
.../org/apache/beam/sdk/coders/ListCoder.java | 8 -
.../org/apache/beam/sdk/coders/MapCoder.java | 12 -
.../beam/sdk/coders/SerializableCoder.java | 57 +-
.../org/apache/beam/sdk/coders/SetCoder.java | 9 -
.../apache/beam/sdk/coders/VarLongCoder.java | 7 -
.../beam/sdk/transforms/CombineFnBase.java | 4 +-
.../apache/beam/sdk/transforms/CombineFns.java | 4 +-
.../org/apache/beam/sdk/transforms/Create.java | 76 ++-
.../apache/beam/sdk/transforms/PTransform.java | 4 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 8 +-
.../apache/beam/sdk/transforms/WithKeys.java | 4 +-
.../sdk/transforms/windowing/GlobalWindow.java | 6 -
.../transforms/windowing/IntervalWindow.java | 7 -
.../org/apache/beam/sdk/values/PCollection.java | 2 +-
.../beam/sdk/values/TimestampedValue.java | 4 -
.../beam/sdk/coders/CoderFactoriesTest.java | 100 ---
.../beam/sdk/coders/CoderProvidersTest.java | 82 ++-
.../beam/sdk/coders/CoderRegistryTest.java | 167 ++---
.../beam/sdk/coders/DefaultCoderTest.java | 65 +-
.../beam/sdk/coders/IterableCoderTest.java | 17 -
.../apache/beam/sdk/coders/ListCoderTest.java | 17 -
.../apache/beam/sdk/coders/MapCoderTest.java | 20 -
.../beam/sdk/coders/SerializableCoderTest.java | 11 +
.../beam/sdk/transforms/CombineFnsTest.java | 10 +-
.../apache/beam/sdk/transforms/CreateTest.java | 6 +-
.../sdk/transforms/FlatMapElementsTest.java | 4 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 4 +-
.../beam/sdk/transforms/MapElementsTest.java | 4 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 17 +-
.../transforms/reflect/DoFnInvokersTest.java | 4 +-
.../apache/beam/sdk/values/TypedPValueTest.java | 28 +-
.../sdk/extensions/protobuf/ProtoCoder.java | 72 ++-
.../ProtobufCoderProviderRegistrar.java | 41 ++
.../protobuf/ProtobufCoderRegistrar.java | 39 --
.../sdk/extensions/protobuf/ProtoCoderTest.java | 7 +-
.../BigQueryCoderProviderRegistrar.java | 40 ++
.../io/gcp/bigquery/BigQueryCoderRegistrar.java | 39 --
.../io/gcp/bigquery/DynamicDestinations.java | 2 +-
.../pubsub/PubsubCoderProviderRegistrar.java | 37 ++
.../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java | 35 --
.../BigQueryCoderProviderRegistrarTest.java | 40 ++
.../bigquery/BigQueryCoderRegistrarTest.java | 40 --
sdks/java/io/hadoop-common/pom.xml | 7 +
.../beam/sdk/io/hadoop/WritableCoder.java | 56 ++
.../beam/sdk/io/hadoop/WritableCoderTest.java | 10 +
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +-
.../beam/sdk/transforms/DistinctJava8Test.java | 8 -
.../beam/sdk/transforms/FilterJava8Test.java | 2 +-
.../beam/sdk/transforms/PartitionJava8Test.java | 2 +-
.../beam/sdk/transforms/WithKeysJava8Test.java | 9 -
73 files changed, 1121 insertions(+), 1689 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 6fd9755..7552b94 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -409,7 +409,7 @@ public class TfIdf {
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+ pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
pipeline
.apply(new ReadDocuments(listInputDocuments(options)))
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
index d263643..3681ff5 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java
@@ -48,7 +48,7 @@ public class TfIdfTest {
@Category(ValidatesRunner.class)
public void testTfIdf() throws Exception {
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+ pipeline.getCoderRegistry().registerCoderForClass(URI.class, StringDelegateCoder.of(URI.class));
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
.apply(Create.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index a4d9639..6503fa2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -274,7 +274,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
input,
PCollection<T> output)
throws CannotProvideCoderException {
- // Similar logic to ParDo.MultiOutput.getDefaultOutputCoder.
+ // Similar logic to ParDo.MultiOutput.getOutputCoder.
@SuppressWarnings("unchecked")
KeyedWorkItemCoder<String, ElementAndRestriction<InputT, RestrictionT>> kwiCoder =
(KeyedWorkItemCoder) input.getCoder();
@@ -284,7 +284,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return input
.getPipeline()
.getCoderRegistry()
- .getDefaultCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
+ .getCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 46f26a1..1e60ca3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -229,7 +229,7 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CoderRegistry reg = pipeline.getCoderRegistry();
StateTag<CombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue("summer",
- sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
+ sumLongFn.getAccumulatorCoder(reg, reg.getCoder(Long.class)), sumLongFn);
GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(0L));
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 0fe9585..85e55eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -577,14 +577,4 @@ public class DirectRunnerTest implements Serializable {
return underlying.getDefaultOutputCoder();
}
}
-
- @Test
- public void fallbackCoderProviderAllowsInference() {
- // See https://issues.apache.org/jira/browse/BEAM-1642
- Pipeline p = getPipeline();
- p.getCoderRegistry().setFallbackCoderProvider(
- org.apache.beam.sdk.coders.AvroCoder.PROVIDER);
- p.apply(Create.of(Arrays.asList(100, 200))).apply(Count.<Integer>globally());
- p.run().waitUntilFinish();
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 528cfb0..043fe93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -33,20 +33,6 @@ import java.util.List;
*/
public abstract class AtomicCoder<T> extends StructuredCoder<T> {
/**
- * Returns an empty list.
- *
- * <p>{@link CoderFactories#fromStaticMethods(Class)} builds a {@link CoderFactory} from the
- * {@code #of()} method and this method, used to determine the components of an object. Because
- * {@link AtomicCoder} has no components, always returns an empty list.
- *
- * @param exampleValue unused, but part of the latent interface expected by {@link
- * CoderFactories#fromStaticMethods}
- */
- public static <T> List<Object> getInstanceComponents(T exampleValue) {
- return Collections.emptyList();
- }
-
- /**
* {@inheritDoc}.
*
* @throws NonDeterministicException
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 2aa2b44..f82c065 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import javax.annotation.Nullable;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
@@ -140,18 +141,39 @@ public class AvroCoder<T> extends CustomCoder<T> {
return new AvroCoder<>(type, schema);
}
- public static final CoderProvider PROVIDER = new CoderProvider() {
+ /**
+ * Returns a {@link CoderProvider} which uses the {@link AvroCoder} if possible for
+ * all types.
+ *
+ * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
+ * accept dangerous types such as {@link Object}.
+ *
+ * <p>This method is invoked reflectively from {@link DefaultCoder}.
+ */
+ @SuppressWarnings("unused")
+ public static CoderProvider getCoderProvider() {
+ return new AvroCoderProvider();
+ }
+
+ /**
+ * A {@link CoderProvider} that constructs an {@link AvroCoder} for Avro compatible classes.
+ *
+ * <p>It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
+ * accept dangerous types such as {@link Object}.
+ */
+ static class AvroCoderProvider extends CoderProvider {
@Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) {
- // This is a downcast from `? super T` to T. However, because
- // it comes from a TypeDescriptor<T>, the class object itself
- // is the same so the supertype in question shares the same
- // generated AvroCoder schema.
- @SuppressWarnings("unchecked")
- Class<T> rawType = (Class<T>) typeDescriptor.getRawType();
- return AvroCoder.of(rawType);
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ try {
+ return AvroCoder.of(typeDescriptor);
+ } catch (AvroRuntimeException e) {
+ throw new CannotProvideCoderException(
+ String.format("%s is not compatible with Avro", typeDescriptor),
+ e);
+ }
}
- };
+ }
private final Class<T> type;
private final SerializableSchemaSupplier schemaSupplier;
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index 28cb627..d83d832 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -21,7 +21,6 @@ import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
import org.apache.beam.sdk.util.StreamUtils;
@@ -45,13 +44,6 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
return INSTANCE;
}
- /**
- * Returns an empty list. {@link ByteArrayCoder} has no components.
- */
- public static <T> List<Object> getInstanceComponents(T ignored) {
- return Collections.emptyList();
- }
-
/////////////////////////////////////////////////////////////////////////////
private static final ByteArrayCoder INSTANCE = new ByteArrayCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
index c37ec00..bc2ef3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CannotProvideCoderException.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.coders;
/**
- * The exception thrown when a {@link CoderProvider} cannot
+ * The exception thrown when a {@link CoderRegistry} or {@link CoderProvider} cannot
* provide a {@link Coder} that has been requested.
*/
public class CannotProvideCoderException extends Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 41e83ac..eeafbd2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -51,9 +51,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*
* <p>{@link Coder} classes for compound types are often composed from coder classes for types
* contains therein. The composition of {@link Coder} instances into a coder for the compound
- * class is the subject of the {@link CoderFactory} type, which enables automatic generic
+ * class is the subject of the {@link CoderProvider} type, which enables automatic generic
* composition of {@link Coder} classes within the {@link CoderRegistry}. With particular
- * static methods on a compound {@link Coder} class, a {@link CoderFactory} can be automatically
+ * static methods on a compound {@link Coder} class, a {@link CoderProvider} can be automatically
* inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports
* automatic composition in the {@link CoderRegistry}.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
deleted file mode 100644
index 4f05c95..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.MoreObjects;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * Static utility methods for creating and working with {@link Coder}s.
- */
-public final class CoderFactories {
- private CoderFactories() { } // Static utility class
-
- /**
- * Creates a {@link CoderFactory} built from particular static methods of a class that
- * implements {@link Coder}.
- *
- * <p>The class must have the following static methods:
- *
- * <ul>
- * <li> {@code
- * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
- * }
- * <li> {@code
- * public static List<Object> getInstanceComponents(T exampleValue);
- * }
- * </ul>
- *
- * <p>The {@code of(...)} method will be used to construct a
- * {@code Coder<T>} from component {@link Coder}s.
- * It must accept one {@link Coder} argument for each
- * generic type parameter of {@code T}. If {@code T} takes no generic
- * type parameters, then the {@code of()} factory method should take
- * no arguments.
- *
- * <p>The {@code getInstanceComponents} method will be used to
- * decompose a value during the {@link Coder} inference process,
- * to automatically choose coders for the components.
- *
- * <p>Note that the class {@code T} to be coded may be a
- * not-yet-specialized generic class.
- * For a generic class {@code MyClass<X>} and an actual type parameter
- * {@code Foo}, the {@link CoderFactoryFromStaticMethods} will
- * accept any {@code Coder<Foo>} and produce a {@code Coder<MyClass<Foo>>}.
- *
- * <p>For example, the {@link CoderFactory} returned by
- * {@code fromStaticMethods(ListCoder.class)}
- * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}.
- */
- public static <T> CoderFactory fromStaticMethods(Class<T> clazz) {
- checkArgument(
- Coder.class.isAssignableFrom(clazz),
- "%s is not a subtype of %s",
- clazz.getName(),
- Coder.class.getSimpleName());
- return new CoderFactoryFromStaticMethods((Class<? extends Coder>) clazz);
- }
-
- /**
- * Creates a {@link CoderFactory} that always returns the
- * given coder.
- *
- * <p>The {@code getInstanceComponents} method of this
- * {@link CoderFactory} always returns an empty list.
- */
- public static <T> CoderFactory forCoder(Coder<T> coder) {
- return new CoderFactoryForCoder<>(coder);
- }
-
- /**
- * See {@link #fromStaticMethods} for a detailed description
- * of the characteristics of this {@link CoderFactory}.
- */
- private static class CoderFactoryFromStaticMethods implements CoderFactory {
-
- @Override
- @SuppressWarnings("rawtypes")
- public Coder<?> create(List<? extends Coder<?>> componentCoders) {
- try {
- return (Coder) factoryMethod.invoke(
- null /* static */, componentCoders.toArray());
- } catch (IllegalAccessException
- | IllegalArgumentException
- | InvocationTargetException
- | NullPointerException
- | ExceptionInInitializerError exn) {
- throw new IllegalStateException(
- "error when invoking Coder factory method " + factoryMethod,
- exn);
- }
- }
-
- @Override
- public List<Object> getInstanceComponents(Object value) {
- try {
- @SuppressWarnings("unchecked")
- List<Object> components = (List<Object>) getComponentsMethod.invoke(
- null /* static */, value);
- return components;
- } catch (IllegalAccessException
- | IllegalArgumentException
- | InvocationTargetException
- | NullPointerException
- | ExceptionInInitializerError exn) {
- throw new IllegalStateException(
- "error when invoking Coder getComponents method " + getComponentsMethod,
- exn);
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////
-
- // Method to create a coder given component coders
- // For a Coder class of kind * -> * -> ... n times ... -> *
- // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
- private final Method factoryMethod;
-
- // Method to decompose a value of type T into its parts.
- // For a Coder class of kind * -> * -> ... n times ... -> *
- // this has type T -> List<Object>
- // where the list has n elements.
- private final Method getComponentsMethod;
-
- /**
- * Returns a CoderFactory that invokes the given static factory method
- * to create the Coder.
- */
- private CoderFactoryFromStaticMethods(Class<? extends Coder> coderClazz) {
- this.factoryMethod = getFactoryMethod(coderClazz);
- this.getComponentsMethod = getInstanceComponentsMethod(coderClazz);
- }
-
- /**
- * Returns the static {@code of} constructor method on {@code coderClazz}
- * if it exists. It is assumed to have one {@link Coder} parameter for
- * each type parameter of {@code coderClazz}.
- */
- private Method getFactoryMethod(Class<?> coderClazz) {
- Method factoryMethodCandidate;
-
- // Find the static factory method of coderClazz named 'of' with
- // the appropriate number of type parameters.
- int numTypeParameters = coderClazz.getTypeParameters().length;
- Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
- Arrays.fill(factoryMethodArgTypes, Coder.class);
- try {
- factoryMethodCandidate =
- coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
- } catch (NoSuchMethodException | SecurityException exn) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "does not have an accessible method named 'of' with "
- + numTypeParameters + " arguments of Coder type",
- exn);
- }
- if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type is not static");
- }
- if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type does not return a " + coderClazz);
- }
- try {
- if (!factoryMethodCandidate.isAccessible()) {
- factoryMethodCandidate.setAccessible(true);
- }
- } catch (SecurityException exn) {
- throw new IllegalArgumentException(
- "cannot register Coder " + coderClazz + ": "
- + "method named 'of' with " + numTypeParameters
- + " arguments of Coder type is not accessible",
- exn);
- }
-
- return factoryMethodCandidate;
- }
-
- /**
- * Finds the static method on {@code coderType} to use
- * to decompose a value of type {@code T} into components,
- * each corresponding to an argument of the {@code of}
- * method.
- */
- private <T, CoderT extends Coder> Method getInstanceComponentsMethod(Class<CoderT> coderClazz) {
- TypeDescriptor<CoderT> coderType = TypeDescriptor.of(coderClazz);
- TypeDescriptor<T> argumentType = getCodedType(coderType);
-
- // getInstanceComponents may be implemented in a superclass,
- // so we search them all for an applicable method. We do not
- // try to be clever about finding the best overload. It may
- // be in a generic superclass, erased to accept an Object.
- // However, subtypes are listed before supertypes (it is a
- // topological ordering) so probably the best one will be chosen
- // if there are more than one (which should be rare)
- for (TypeDescriptor<?> supertype : coderType.getClasses()) {
- for (Method method : supertype.getRawType().getDeclaredMethods()) {
- if (method.getName().equals("getInstanceComponents")) {
- TypeDescriptor<?> formalArgumentType = supertype.getArgumentTypes(method).get(0);
- if (formalArgumentType.getRawType().isAssignableFrom(argumentType.getRawType())) {
- return method;
- }
- }
- }
- }
-
- throw new IllegalArgumentException(
- "cannot create a CoderFactory from " + coderType + ": "
- + "does not have an accessible method "
- + "'getInstanceComponents'");
- }
-
- /**
- * If {@code coderType} is a subclass of {@link Coder} for a specific type {@code T}, returns
- * {@code T.class}. Otherwise, raises IllegalArgumentException.
- */
- private <T, CoderT extends Coder> TypeDescriptor<T> getCodedType(
- TypeDescriptor<CoderT> coderType) {
- TypeDescriptor<? super CoderT> coderSupertype = coderType.getSupertype(Coder.class);
- ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType();
- @SuppressWarnings("unchecked")
- TypeDescriptor<T> token =
- (TypeDescriptor<T>) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]);
- return token;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("factoryMethod", factoryMethod)
- .add("getComponentsMethod", getComponentsMethod)
- .toString();
- }
- }
-
- /**
- * See {@link #forCoder} for a detailed description of this
- * {@link CoderFactory}.
- */
- private static class CoderFactoryForCoder<T> implements CoderFactory {
- private final Coder<T> coder;
-
- public CoderFactoryForCoder(Coder<T> coder) {
- this.coder = coder;
- }
-
- @Override
- public Coder<?> create(List<? extends Coder<?>> componentCoders) {
- return this.coder;
- }
-
- @Override
- public List<Object> getInstanceComponents(Object value) {
- return Collections.emptyList();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("coder", coder)
- .toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
deleted file mode 100644
index 22d03fa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import java.util.List;
-
-/**
- * A {@link CoderFactory} creates coders and decomposes values.
- * It may operate on a parameterized type, such as {@link List},
- * in which case the {@link #create} method accepts a list of
- * coders to use for the type parameters.
- */
-public interface CoderFactory {
-
- /**
- * Returns a {@code Coder<?>}, given argument coder to use for
- * values of a particular type, given the Coders for each of
- * the type's generic parameter types.
- */
- Coder<?> create(List<? extends Coder<?>> componentCoders);
-
- /**
- * Returns a list of objects contained in {@code value}, one per
- * type argument, or {@code null} if none can be determined.
- * The list of returned objects should be the same size as the
- * list of coders required by {@link #create}.
- */
- List<Object> getInstanceComponents(Object value);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
index 0db73eb..ac042ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProvider.java
@@ -17,18 +17,25 @@
*/
package org.apache.beam.sdk.coders;
+import java.util.List;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
- * A {@link CoderProvider} may create a {@link Coder} for
- * any concrete class.
+ * A {@link CoderProvider} provides {@link Coder}s.
+ *
+ * <p>It may operate on a parameterized type, such as {@link List}, in which case the
+ * {@link #coderFor} method accepts a list of coders to use for the type parameters.
*/
-public interface CoderProvider {
+public abstract class CoderProvider {
/**
- * Provides a coder for a given class, if possible.
+ * Returns a {@code Coder<T>} to use for values of a particular type, given the Coders for each of
+ * the type's generic parameter types.
*
- * @throws CannotProvideCoderException if no coder can be provided
+ * <p>Throws {@link CannotProvideCoderException} if this {@link CoderProvider} cannot provide
+ * a coder for this type and components.
*/
- <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException;
+ public abstract <T> Coder<T> coderFor(
+ TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders)
+ throws CannotProvideCoderException;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
new file mode 100644
index 0000000..35d061d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviderRegistrar.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import com.google.auto.service.AutoService;
+import java.util.List;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * {@link Coder} creators have the ability to automatically have their
+ * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry
+ * and a concrete implementation of this interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+@Experimental
+public interface CoderProviderRegistrar {
+ /**
+ * Returns a list of {@link CoderProvider coder providers} which
+ * will be registered by default within each {@link CoderRegistry coder registry} instance.
+ *
+ * <p>See {@link CoderProviders} for convenience methods to construct a {@link CoderProvider}.
+ */
+ List<CoderProvider> getCoderProviders();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
index c072008..414fd8b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
@@ -19,146 +19,178 @@ package org.apache.beam.sdk.coders;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.google.common.base.MoreObjects;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
- * Static utility methods for working with {@link CoderProvider CoderProviders}.
+ * Static utility methods for creating and working with {@link CoderProvider}s.
*/
public final class CoderProviders {
-
- // Static utility class
- private CoderProviders() { }
+ private CoderProviders() { } // Static utility class
/**
- * Creates a {@link CoderProvider} built from particular static methods of a class that
- * implements {@link Coder}. The requirements for this method are precisely the requirements
- * for a {@link Coder} class to be usable with {@link DefaultCoder} annotations.
- *
- * <p>The class must have the following static method:
- *
- * <pre>{@code
- * public static Coder<T> of(TypeDescriptor<T> type)
- * }
- * </pre>
+ * Creates a {@link CoderProvider} from a class's
+ * {@code static <T> Coder<T> of(TypeDescriptor<T>, List<Coder<?>>}) method.
*/
- public static <T> CoderProvider fromStaticMethods(Class<T> clazz) {
- return new CoderProviderFromStaticMethods(clazz);
+ public static CoderProvider fromStaticMethods(Class<?> rawType, Class<?> coderClazz) {
+ checkArgument(
+ Coder.class.isAssignableFrom(coderClazz),
+ "%s is not a subtype of %s",
+ coderClazz.getName(),
+ Coder.class.getSimpleName());
+ return new CoderProviderFromStaticMethods(rawType, coderClazz);
}
-
/**
- * Returns a {@link CoderProvider} that consults each of the provider {@code coderProviders}
- * and returns the first {@link Coder} provided.
- *
- * <p>Note that the order in which the providers are listed matters: While the set of types
- * handled will be the union of those handled by all of the providers in the list, the actual
- * {@link Coder} provided by the first successful provider may differ, and may have inferior
- * properties. For example, not all {@link Coder Coders} are deterministic, handle {@code null}
- * values, or have comparable performance.
+ * Creates a {@link CoderProvider} that always returns the
+ * given coder for the specified type.
*/
- public static CoderProvider firstOf(CoderProvider... coderProviders) {
- return new FirstOf(ImmutableList.copyOf(coderProviders));
+ public static CoderProvider forCoder(TypeDescriptor<?> type, Coder<?> coder) {
+ return new CoderProviderForCoder(type, coder);
}
- ///////////////////////////////////////////////////////////////////////////////////////////////
-
/**
- * @see #firstOf
+ * See {@link #fromStaticMethods} for a detailed description
+ * of the characteristics of this {@link CoderProvider}.
*/
- private static class FirstOf implements CoderProvider {
-
- private Iterable<CoderProvider> providers;
-
- public FirstOf(Iterable<CoderProvider> providers) {
- this.providers = providers;
- }
+ private static class CoderProviderFromStaticMethods extends CoderProvider {
@Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- List<String> messages = Lists.newArrayList();
- for (CoderProvider provider : providers) {
- try {
- return provider.getCoder(type);
- } catch (CannotProvideCoderException exc) {
- messages.add(String.format("%s could not provide a Coder for type %s: %s",
- provider, type, exc.getMessage()));
- }
+ public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends Coder<?>> componentCoders)
+ throws CannotProvideCoderException {
+ if (!this.rawType.equals(type.getRawType())) {
+ throw new CannotProvideCoderException(String.format(
+ "Unable to provide coder for %s, this factory can only provide coders for %s",
+ type,
+ this.rawType));
+ }
+ try {
+ return (Coder) factoryMethod.invoke(
+ null /* static */, componentCoders.toArray());
+ } catch (IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | NullPointerException
+ | ExceptionInInitializerError exn) {
+ throw new IllegalStateException(
+ "error when invoking Coder factory method " + factoryMethod,
+ exn);
}
- throw new CannotProvideCoderException(
- String.format("Cannot provide coder for type %s: %s.",
- type, Joiner.on("; ").join(messages)));
}
- }
- private static class CoderProviderFromStaticMethods implements CoderProvider {
+ ////////////////////////////////////////////////////////////////////////////////
- /** If true, then clazz has {@code of(TypeDescriptor)}. If false, {@code of(Class)}. */
- private final boolean takesTypeDescriptor;
- private final Class<?> clazz;
+ // Type raw type used to filter the incoming type on.
+ private final Class<?> rawType;
- public CoderProviderFromStaticMethods(Class<?> clazz) {
- // Note that the second condition supports older classes, which only needed to provide
- // of(Class), not of(TypeDescriptor). Our own classes have updated to accept a
- // TypeDescriptor. Hence the error message points only to the current specification,
- // not both acceptable conditions.
- checkArgument(classTakesTypeDescriptor(clazz) || classTakesClass(clazz),
- "Class " + clazz.getCanonicalName()
- + " is missing required static method of(TypeDescriptor).");
+ // Method to create a coder given component coders
+ // For a Coder class of kind * -> * -> ... n times ... -> *
+ // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
+ private final Method factoryMethod;
- this.takesTypeDescriptor = classTakesTypeDescriptor(clazz);
- this.clazz = clazz;
+ /**
+ * Returns a CoderProvider that invokes the given static factory method
+ * to create the Coder.
+ */
+ private CoderProviderFromStaticMethods(Class<?> rawType, Class<?> coderClazz) {
+ this.rawType = rawType;
+ this.factoryMethod = getFactoryMethod(coderClazz);
}
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+ /**
+ * Returns the static {@code of} constructor method on {@code coderClazz}
+ * if it exists. It is assumed to have one {@link Coder} parameter for
+ * each type parameter of {@code coderClazz}.
+ */
+ private Method getFactoryMethod(Class<?> coderClazz) {
+ Method factoryMethodCandidate;
+
+ // Find the static factory method of coderClazz named 'of' with
+ // the appropriate number of type parameters.
+ int numTypeParameters = coderClazz.getTypeParameters().length;
+ Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
+ Arrays.fill(factoryMethodArgTypes, Coder.class);
try {
- if (takesTypeDescriptor) {
- @SuppressWarnings("unchecked")
- Coder<T> result = InstanceBuilder.ofType(Coder.class)
- .fromClass(clazz)
- .fromFactoryMethod("of")
- .withArg(TypeDescriptor.class, type)
- .build();
- return result;
- } else {
- @SuppressWarnings("unchecked")
- Coder<T> result = InstanceBuilder.ofType(Coder.class)
- .fromClass(clazz)
- .fromFactoryMethod("of")
- .withArg(Class.class, type.getRawType())
- .build();
- return result;
- }
- } catch (RuntimeException exc) {
- if (exc.getCause() instanceof InvocationTargetException) {
- throw new CannotProvideCoderException(exc.getCause().getCause());
+ factoryMethodCandidate =
+ coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
+ } catch (NoSuchMethodException | SecurityException exn) {
+ throw new IllegalArgumentException(
+ "cannot register Coder " + coderClazz + ": "
+ + "does not have an accessible method named 'of' with "
+ + numTypeParameters + " arguments of Coder type",
+ exn);
+ }
+ if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
+ throw new IllegalArgumentException(
+ "cannot register Coder " + coderClazz + ": "
+ + "method named 'of' with " + numTypeParameters
+ + " arguments of Coder type is not static");
+ }
+ if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
+ throw new IllegalArgumentException(
+ "cannot register Coder " + coderClazz + ": "
+ + "method named 'of' with " + numTypeParameters
+ + " arguments of Coder type does not return a " + coderClazz);
+ }
+ try {
+ if (!factoryMethodCandidate.isAccessible()) {
+ factoryMethodCandidate.setAccessible(true);
}
- throw exc;
+ } catch (SecurityException exn) {
+ throw new IllegalArgumentException(
+ "cannot register Coder " + coderClazz + ": "
+ + "method named 'of' with " + numTypeParameters
+ + " arguments of Coder type is not accessible",
+ exn);
}
+
+ return factoryMethodCandidate;
}
- private boolean classTakesTypeDescriptor(Class<?> clazz) {
- try {
- clazz.getDeclaredMethod("of", TypeDescriptor.class);
- return true;
- } catch (NoSuchMethodException | SecurityException exc) {
- return false;
- }
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("rawType", rawType)
+ .add("factoryMethod", factoryMethod)
+ .toString();
}
+ }
- private boolean classTakesClass(Class<?> clazz) {
- try {
- clazz.getDeclaredMethod("of", Class.class);
- return true;
- } catch (NoSuchMethodException | SecurityException exc) {
- return false;
+ /**
+ * See {@link #forCoder} for a detailed description of this {@link CoderProvider}.
+ */
+ private static class CoderProviderForCoder extends CoderProvider {
+ private final Coder<?> coder;
+ private final TypeDescriptor<?> type;
+
+ public CoderProviderForCoder(TypeDescriptor<?> type, Coder<?> coder){
+ this.type = type;
+ this.coder = coder;
+ }
+
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends Coder<?>> componentCoders)
+ throws CannotProvideCoderException {
+ if (!this.type.equals(type)) {
+ throw new CannotProvideCoderException(String.format(
+ "Unable to provide coder for %s, this factory can only provide coders for %s",
+ type,
+ this.type));
}
+ return (Coder) coder;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("type", type)
+ .add("coder", coder)
+ .toString();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
deleted file mode 100644
index fced976..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistrar.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.annotations.Experimental;
-
-/**
- * {@link Coder} creators have the ability to automatically have their
- * {@link Coder coders} registered with this SDK by creating a {@link ServiceLoader} entry
- * and a concrete implementation of this interface.
- *
- * <p>It is optional but recommended to use one of the many build time tools such as
- * {@link AutoService} to generate the necessary META-INF files automatically.
- */
-@Experimental
-public interface CoderRegistrar {
- /**
- * Returns a mapping of {@link Class classes} to {@link CoderFactory coder factories} which
- * will be registered by default within each {@link CoderRegistry coder registry} instance.
- *
- * <p>See {@link CoderFactories} for convenience methods to construct a {@link CoderFactory}.
- *
- * <p>Note that a warning is logged if multiple {@link CoderRegistrar coder registrars} provide
- * mappings for the same {@link Class}.
- */
- Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses();
-}
[2/4] beam git commit: [BEAM-2174] Update CoderRegistry to allow
creating coders through CoderFactory for a wider range of types
Posted by lc...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 7f0f632..2ba548a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -20,12 +20,10 @@ package org.apache.beam.sdk.coders;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
@@ -37,12 +35,13 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -57,102 +56,118 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A {@link CoderRegistry} allows registering the default {@link Coder} to use for a Java class,
- * and looking up and instantiating the default {@link Coder} for a Java type.
+ * A {@link CoderRegistry} allows creating a {@link Coder} for a given Java {@link Class class} or
+ * {@link TypeDescriptor type descriptor}.
*
- * <p>{@link CoderRegistry} uses the following mechanisms to determine a default {@link Coder} for a
- * Java class, in order of precedence:
- * <ol>
- * <li>Registration:
- * <ul>
- * <li>A {@link CoderFactory} can be registered to handle a particular class via
- * {@link #registerCoder(Class, CoderFactory)}.</li>
- * <li>A {@link Coder} class with the static methods to satisfy
- * {@link CoderFactories#fromStaticMethods} can be registered via
- * {@link #registerCoder(Class, Class)}.</li>
- * <li>Types can be automatically registered via {@link CoderRegistrar coder registrars}.</li>
- * </ul>
- * <li>Annotations: {@link DefaultCoder} can be used to annotate a type with
- * the default {@code Coder} type. The {@link Coder} class must satisfy the requirements
- * of {@link CoderProviders#fromStaticMethods}.
- * <li>Fallback: A fallback {@link CoderProvider} is used to attempt to provide a {@link Coder}
- * for any type. By default, there is {@link SerializableCoder#PROVIDER}, which can provide a
- * {@link Coder} for any type that is serializable via Java serialization. The fallback
- * {@link CoderProvider} can be get and set respectively using
- * {@link #getFallbackCoderProvider()} and {@link #setFallbackCoderProvider}. Multiple
- * fallbacks can be chained together using {@link CoderProviders#firstOf}.
- * </ol>
+ * <p>Creation of the {@link Coder} is delegated to one of the many registered
+ * {@link CoderProvider coder providers} based upon the registration order.
+ *
+ * <p>By default, the {@link CoderProvider coder provider} precedence order is as follows:
+ * <ul>
+ * <li>Coder providers registered programmatically with
+ * {@link CoderRegistry#registerCoderProvider(CoderProvider)}.</li>
+ * <li>A default coder provider for common Java (Byte, Double, List, ...) and
+ * Apache Beam (KV, ...) types.</li>
+ * <li>Coder providers registered automatically through a {@link CoderProviderRegistrar} using
+ * a {@link ServiceLoader}. Note that the {@link ServiceLoader} registration order is
+ * consistent but may change due to the addition or removal of libraries exposed
+ * to the application. This can impact the coder returned if multiple coder providers
+ * are capable of supplying a coder for the specified type.</li>
+ * </ul>
+ *
+ * <p>Note that if multiple {@link CoderProvider coder providers} can provide a {@link Coder} for
+ * a given type, the precedence order above defines which {@link CoderProvider} is chosen.
*/
-public class CoderRegistry implements CoderProvider {
+public class CoderRegistry {
private static final Logger LOG = LoggerFactory.getLogger(CoderRegistry.class);
- private static final Map<Class<?>, CoderFactory> REGISTERED_CODER_FACTORIES_PER_CLASS;
+ private static final List<CoderProvider> REGISTERED_CODER_FACTORIES;
+
+ /** A {@link CoderProvider} for common Java SDK and Apache Beam SDK types. */
+ private static class CommonTypes extends CoderProvider {
+ private final Map<Class<?>, CoderProvider> commonTypesToCoderProviders;
+
+ private CommonTypes() {
+ ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder();
+ builder.put(Byte.class,
+ CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class));
+ builder.put(BitSet.class,
+ CoderProviders.fromStaticMethods(BitSet.class, BitSetCoder.class));
+ builder.put(Double.class,
+ CoderProviders.fromStaticMethods(Double.class, DoubleCoder.class));
+ builder.put(Instant.class,
+ CoderProviders.fromStaticMethods(Instant.class, InstantCoder.class));
+ builder.put(Integer.class,
+ CoderProviders.fromStaticMethods(Integer.class, VarIntCoder.class));
+ builder.put(Iterable.class,
+ CoderProviders.fromStaticMethods(Iterable.class, IterableCoder.class));
+ builder.put(KV.class,
+ CoderProviders.fromStaticMethods(KV.class, KvCoder.class));
+ builder.put(List.class,
+ CoderProviders.fromStaticMethods(List.class, ListCoder.class));
+ builder.put(Long.class,
+ CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class));
+ builder.put(Map.class,
+ CoderProviders.fromStaticMethods(Map.class, MapCoder.class));
+ builder.put(Set.class,
+ CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
+ builder.put(String.class,
+ CoderProviders.fromStaticMethods(String.class, StringUtf8Coder.class));
+ builder.put(TimestampedValue.class,
+ CoderProviders.fromStaticMethods(
+ TimestampedValue.class, TimestampedValue.TimestampedValueCoder.class));
+ builder.put(Void.class,
+ CoderProviders.fromStaticMethods(Void.class, VoidCoder.class));
+ builder.put(byte[].class,
+ CoderProviders.fromStaticMethods(byte[].class, ByteArrayCoder.class));
+ builder.put(IntervalWindow.class,
+ CoderProviders.forCoder(
+ TypeDescriptor.of(IntervalWindow.class), IntervalWindow.getCoder()));
+ commonTypesToCoderProviders = builder.build();
+ }
+
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ CoderProvider factory = commonTypesToCoderProviders.get(typeDescriptor.getRawType());
+ if (factory == null) {
+ throw new CannotProvideCoderException(
+ String.format("%s is not one of the common types.", typeDescriptor));
+ }
+ return factory.coderFor(typeDescriptor, componentCoders);
+ }
+ }
static {
- // Register the standard coders first so they are chosen as the default
- Multimap<Class<?>, CoderFactory> codersToRegister = HashMultimap.create();
- codersToRegister.put(Byte.class, CoderFactories.fromStaticMethods(ByteCoder.class));
- codersToRegister.put(BitSet.class, CoderFactories.fromStaticMethods(BitSetCoder.class));
- codersToRegister.put(Double.class, CoderFactories.fromStaticMethods(DoubleCoder.class));
- codersToRegister.put(Instant.class, CoderFactories.fromStaticMethods(InstantCoder.class));
- codersToRegister.put(Integer.class, CoderFactories.fromStaticMethods(VarIntCoder.class));
- codersToRegister.put(Iterable.class, CoderFactories.fromStaticMethods(IterableCoder.class));
- codersToRegister.put(KV.class, CoderFactories.fromStaticMethods(KvCoder.class));
- codersToRegister.put(List.class, CoderFactories.fromStaticMethods(ListCoder.class));
- codersToRegister.put(Long.class, CoderFactories.fromStaticMethods(VarLongCoder.class));
- codersToRegister.put(Map.class, CoderFactories.fromStaticMethods(MapCoder.class));
- codersToRegister.put(Set.class, CoderFactories.fromStaticMethods(SetCoder.class));
- codersToRegister.put(String.class, CoderFactories.fromStaticMethods(StringUtf8Coder.class));
- codersToRegister.put(TimestampedValue.class,
- CoderFactories.fromStaticMethods(TimestampedValue.TimestampedValueCoder.class));
- codersToRegister.put(Void.class, CoderFactories.fromStaticMethods(VoidCoder.class));
- codersToRegister.put(byte[].class, CoderFactories.fromStaticMethods(ByteArrayCoder.class));
- codersToRegister.put(IntervalWindow.class, CoderFactories.forCoder(IntervalWindow.getCoder()));
+ // Register the standard coders first so they are chosen over ServiceLoader ones
+ List<CoderProvider> codersToRegister = new ArrayList<>();
+ codersToRegister.add(new CommonTypes());
// Enumerate all the CoderRegistrars in a deterministic order, adding all coders to register
- Set<CoderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
+ Set<CoderProviderRegistrar> registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
registrars.addAll(Lists.newArrayList(
- ServiceLoader.load(CoderRegistrar.class, ReflectHelpers.findClassLoader())));
- for (CoderRegistrar registrar : registrars) {
- for (Map.Entry<Class<?>, CoderFactory> entry
- : registrar.getCoderFactoriesToUseForClasses().entrySet()) {
- codersToRegister.put(entry.getKey(), entry.getValue());
- }
- }
-
- // Warn the user if multiple coders want to be registered for the same class
- Map<Class<?>, Collection<CoderFactory>> multipleRegistrations =
- Maps.filterValues(codersToRegister.asMap(), new Predicate<Collection<CoderFactory>>() {
- @Override
- public boolean apply(@Nonnull Collection<CoderFactory> input) {
- return input.size() > 1;
- }
- });
- for (Map.Entry<Class<?>, Collection<CoderFactory>> entry : multipleRegistrations.entrySet()) {
- LOG.warn("Multiple CoderFactory registrations {} found for class {}, using {}.",
- entry.getKey(), entry.getValue(), entry.getValue().iterator().next());
+ ServiceLoader.load(CoderProviderRegistrar.class, ReflectHelpers.findClassLoader())));
+ for (CoderProviderRegistrar registrar : registrars) {
+ codersToRegister.addAll(registrar.getCoderProviders());
}
- // Build a map choosing the first coder within the multimap as the default
- ImmutableMap.Builder<Class<?>, CoderFactory> registeredCoderFactoriesPerClassBuilder =
- ImmutableMap.builder();
- for (Map.Entry<Class<?>, Collection<CoderFactory>> entry
- : codersToRegister.asMap().entrySet()) {
- registeredCoderFactoriesPerClassBuilder.put(
- entry.getKey(), entry.getValue().iterator().next());
- }
- REGISTERED_CODER_FACTORIES_PER_CLASS = registeredCoderFactoriesPerClassBuilder.build();
+ REGISTERED_CODER_FACTORIES = ImmutableList.copyOf(codersToRegister);
}
/**
* Creates a CoderRegistry containing registrations for all standard coders part of the core Java
- * Apache Beam SDK and also any registrations provided by {@link CoderRegistrar coder registrars}.
+ * Apache Beam SDK and also any registrations provided by
+ * {@link CoderProviderRegistrar coder registrars}.
*
- * <p>Multiple registrations for the same class result in the (in order of precedence):
+ * <p>Multiple registrations which can produce a coder for a given type result in a Coder created
+ * by the (in order of precedence):
* <ul>
- * <li>Standard coder part of the core Apache Beam Java SDK being used.</li>
- * <li>The coder from the {@link CoderRegistrar} with the lexicographically smallest
- * {@link Class#getName() class name} being used.</li>
+ * <li>{@link CoderProvider coder providers} registered programmatically through
+ * {@link CoderRegistry#registerCoderProvider}.</li>
+ * <li>{@link CoderProvider coder providers} for core types found within the Apache Beam Java
+ * SDK being used.</li>
+ * <li>The {@link CoderProvider coder providers} from the {@link CoderProviderRegistrar}
+ * with the lexicographically smallest {@link Class#getName() class name} being used.</li>
* </ul>
*/
public static CoderRegistry createDefault() {
@@ -160,97 +175,69 @@ public class CoderRegistry implements CoderProvider {
}
private CoderRegistry() {
- coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS);
- setFallbackCoderProvider(
- CoderProviders.firstOf(SerializableCoder.PROVIDER));
+ coderProviders = new LinkedList<>(REGISTERED_CODER_FACTORIES);
}
/**
- * Registers {@code coderClazz} as the default {@link Coder} class to handle encoding and
- * decoding instances of {@code clazz}, overriding prior registrations if any exist.
- *
- * <p>Supposing {@code T} is the static type corresponding to the {@code clazz}, then
- * {@code coderClazz} should have a static factory method with the following signature:
- *
- * <pre> {@code
- * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
- * } </pre>
+ * Registers {@code coderProvider} as a potential {@link CoderProvider} which can produce
+ * {@code Coder} instances.
*
- * <p>This method will be called to create instances of {@code Coder<T>} for values of type
- * {@code T}, passing Coders for each of the generic type parameters of {@code T}. If {@code T}
- * takes no generic type parameters, then the {@code of()} factory method should have no
- * arguments.
+ * <p>This method prioritizes this {@link CoderProvider} over all prior registered coders.
*
- * <p>If {@code T} is a parameterized type, then it should additionally have a method with the
- * following signature:
- *
- * <pre> {@code
- * public static List<Object> getInstanceComponents(T exampleValue);
- * } </pre>
- *
- * <p>This method will be called to decompose a value during the {@link Coder} inference process,
- * to automatically choose {@link Coder Coders} for the components.
- *
- * @param clazz the class of objects to be encoded
- * @param coderClazz a class with static factory methods to provide {@link Coder Coders}
+ * <p>See {@link CoderProviders} for common {@link CoderProvider} patterns.
*/
- public void registerCoder(Class<?> clazz, Class<?> coderClazz) {
- registerCoder(clazz, CoderFactories.fromStaticMethods(coderClazz));
+ public void registerCoderProvider(CoderProvider coderProvider) {
+ coderProviders.addFirst(coderProvider);
}
/**
- * Registers {@code coderFactory} as the default {@link CoderFactory} to produce {@code Coder}
- * instances to decode and encode instances of {@code clazz}. This will override prior
- * registrations if any exist.
+ * Registers the provided {@link Coder} for the given class.
+ *
+ * <p>Note that this is equivalent to {@code registerCoderForType(TypeDescriptor.of(clazz))}. See
+ * {@link #registerCoderForType(TypeDescriptor, Coder)} for further details.
*/
- public void registerCoder(Class<?> clazz, CoderFactory coderFactory) {
- coderFactoryMap.put(clazz, coderFactory);
+ public void registerCoderForClass(Class<?> clazz, Coder<?> coder) {
+ registerCoderForType(TypeDescriptor.of(clazz), coder);
}
/**
- * Register the provided {@link Coder} for encoding all values of the specified {@code Class}.
- * This will override prior registrations if any exist.
+ * Registers the provided {@link Coder} for the given type.
*
- * <p>Not for use with generic rawtypes. Instead, register a {@link CoderFactory} via
- * {@link #registerCoder(Class, CoderFactory)} or ensure your {@code Coder} class has the
- * appropriate static methods and register it directly via {@link #registerCoder(Class, Class)}.
+ * <p>Note that this is equivalent to
+ * {@code registerCoderProvider(CoderProviders.forCoder(type, coder))}. See
+ * {@link #registerCoderProvider} and {@link CoderProviders#forCoder} for further details.
*/
- public <T> void registerCoder(Class<T> rawClazz, Coder<T> coder) {
- checkArgument(
- rawClazz.getTypeParameters().length == 0,
- "CoderRegistry.registerCoder(Class<T>, Coder<T>) may not be used "
- + "with unspecialized generic classes");
-
- CoderFactory factory = CoderFactories.forCoder(coder);
- registerCoder(rawClazz, factory);
+ public void registerCoderForType(TypeDescriptor<?> type, Coder<?> coder) {
+ registerCoderProvider(CoderProviders.forCoder(type, coder));
}
/**
- * Returns the {@link Coder} to use by default for values of the given type.
- *
- * @throws CannotProvideCoderException if there is no default Coder.
+ * Returns the {@link Coder} to use for values of the given class.
+
+ * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
*/
- public <T> Coder<T> getDefaultCoder(TypeDescriptor<T> typeDescriptor)
- throws CannotProvideCoderException {
- return getDefaultCoder(typeDescriptor, Collections.<Type, Coder<?>>emptyMap());
+ public <T> Coder<T> getCoder(Class<T> clazz) throws CannotProvideCoderException {
+ return getCoder(TypeDescriptor.of(clazz));
}
/**
- * See {@link #getDefaultCoder(TypeDescriptor)}.
+ * Returns the {@link Coder} to use for values of the given type.
+ *
+ * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
*/
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
- throws CannotProvideCoderException {
- return getDefaultCoder(typeDescriptor);
+ public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
+ return getCoderFromTypeDescriptor(type, Collections.<Type, Coder<?>>emptyMap());
}
/**
- * Returns the {@link Coder} to use by default for values of the given type, where the given input
+ * Returns the {@link Coder} for values of the given type, where the given input
* type uses the given {@link Coder}.
*
- * @throws CannotProvideCoderException if there is no default Coder.
+ * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
*/
- public <InputT, OutputT> Coder<OutputT> getDefaultCoder(
+ @Deprecated
+ @Internal
+ public <InputT, OutputT> Coder<OutputT> getCoder(
TypeDescriptor<OutputT> typeDescriptor,
TypeDescriptor<InputT> inputTypeDescriptor,
Coder<InputT> inputCoder)
@@ -258,22 +245,26 @@ public class CoderRegistry implements CoderProvider {
checkArgument(typeDescriptor != null);
checkArgument(inputTypeDescriptor != null);
checkArgument(inputCoder != null);
- return getDefaultCoder(
+ return getCoderFromTypeDescriptor(
typeDescriptor, getTypeToCoderBindings(inputTypeDescriptor.getType(), inputCoder));
}
/**
* Returns the {@link Coder} to use on elements produced by this function, given the {@link Coder}
* used for its input elements.
+ *
+ * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
*/
- public <InputT, OutputT> Coder<OutputT> getDefaultOutputCoder(
+ @Deprecated
+ @Internal
+ public <InputT, OutputT> Coder<OutputT> getOutputCoder(
SerializableFunction<InputT, OutputT> fn, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
ParameterizedType fnType = (ParameterizedType)
TypeDescriptor.of(fn.getClass()).getSupertype(SerializableFunction.class).getType();
- return getDefaultCoder(
+ return getCoder(
fn.getClass(),
SerializableFunction.class,
ImmutableMap.of(fnType.getActualTypeArguments()[0], inputCoder),
@@ -284,9 +275,11 @@ public class CoderRegistry implements CoderProvider {
* Returns the {@link Coder} to use for the specified type parameter specialization of the
* subclass, given {@link Coder Coders} to use for all other type parameters (if any).
*
- * @throws CannotProvideCoderException if there is no default Coder.
+ * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
*/
- public <T, OutputT> Coder<OutputT> getDefaultCoder(
+ @Deprecated
+ @Internal
+ public <T, OutputT> Coder<OutputT> getCoder(
Class<? extends T> subClass,
Class<T> baseClass,
Map<Type, ? extends Coder<?>> knownCoders,
@@ -305,147 +298,11 @@ public class CoderRegistry implements CoderProvider {
}
}
- /**
- * Returns the {@link Coder} to use for the provided example value, if it can be determined.
- *
- * @throws CannotProvideCoderException if there is no default {@link Coder} or
- * more than one {@link Coder} matches
- */
- public <T> Coder<T> getDefaultCoder(T exampleValue) throws CannotProvideCoderException {
- Class<?> clazz = exampleValue == null ? Void.class : exampleValue.getClass();
-
- if (clazz.getTypeParameters().length == 0) {
- // Trust that getDefaultCoder returns a valid
- // Coder<T> for non-generic clazz.
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) getDefaultCoder(clazz);
- return coder;
- } else {
- CoderFactory factory = getDefaultCoderFactory(clazz);
-
- List<Object> components = factory.getInstanceComponents(exampleValue);
- if (components == null) {
- throw new CannotProvideCoderException(String.format(
- "Cannot provide coder based on value with class %s: The registered CoderFactory with "
- + "class %s failed to decompose the value, which is required in order to provide "
- + "Coders for the components.",
- clazz.getCanonicalName(), factory.getClass().getCanonicalName()));
- }
-
- // componentcoders = components.map(this.getDefaultCoder)
- List<Coder<?>> componentCoders = new ArrayList<>();
- for (Object component : components) {
- try {
- Coder<?> componentCoder = getDefaultCoder(component);
- componentCoders.add(componentCoder);
- } catch (CannotProvideCoderException exc) {
- throw new CannotProvideCoderException(
- String.format("Cannot provide coder based on value with class %s",
- clazz.getCanonicalName()),
- exc);
- }
- }
-
- // Trust that factory.create maps from valid component Coders
- // to a valid Coder<T>.
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) factory.create(componentCoders);
- return coder;
- }
- }
-
- /**
- * Returns the {@link Coder} to use by default for values of the given class. The following three
- * sources for a {@link Coder} will be attempted, in order:
- *
- * <ol>
- * <li>A {@link Coder} class registered explicitly via a call to {@link #registerCoder},
- * <li>A {@link DefaultCoder} annotation on the class,
- * <li>This registry's fallback {@link CoderProvider}, which may be able to generate a
- * {@link Coder} for an arbitrary class.
- * </ol>
- *
- * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
- */
- public <T> Coder<T> getDefaultCoder(Class<T> clazz) throws CannotProvideCoderException {
-
- CannotProvideCoderException factoryException;
- try {
- CoderFactory coderFactory = getDefaultCoderFactory(clazz);
- LOG.debug("Default coder for {} found by factory", clazz);
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) coderFactory.create(Collections.<Coder<?>>emptyList());
- return coder;
- } catch (CannotProvideCoderException exc) {
- factoryException = exc;
- }
-
- CannotProvideCoderException annotationException;
- try {
- return getDefaultCoderFromAnnotation(clazz);
- } catch (CannotProvideCoderException exc) {
- annotationException = exc;
- }
-
- CannotProvideCoderException fallbackException;
- if (getFallbackCoderProvider() != null) {
- try {
- return getFallbackCoderProvider().getCoder(TypeDescriptor.<T>of(clazz));
- } catch (CannotProvideCoderException exc) {
- fallbackException = exc;
- }
- } else {
- fallbackException = new CannotProvideCoderException("no fallback CoderProvider configured");
- }
-
- // Build up the error message and list of causes.
- StringBuilder messageBuilder = new StringBuilder()
- .append("Unable to provide a default Coder for ").append(clazz.getCanonicalName())
- .append(". Correct one of the following root causes:");
-
- messageBuilder
- .append("\n Building a Coder using a registered CoderFactory failed: ")
- .append(factoryException.getMessage());
-
- messageBuilder
- .append("\n Building a Coder from the @DefaultCoder annotation failed: ")
- .append(annotationException.getMessage());
-
- messageBuilder
- .append("\n Building a Coder from the fallback CoderProvider failed: ")
- .append(fallbackException.getMessage());
-
- throw new CannotProvideCoderException(messageBuilder.toString());
- }
-
- /**
- * Sets the fallback {@link CoderProvider} for this registry. If no other method succeeds in
- * providing a {@code Coder<T>} for a type {@code T}, then the registry will attempt to create
- * a {@link Coder} using this {@link CoderProvider}.
- *
- * <p>By default, this is set to {@link SerializableCoder#PROVIDER}.
- *
- * <p>See {@link #getFallbackCoderProvider}.
- */
- public void setFallbackCoderProvider(CoderProvider coderProvider) {
- fallbackCoderProvider = coderProvider;
- }
-
- /**
- * Returns the fallback {@link CoderProvider} for this registry.
- *
- * <p>See {@link #setFallbackCoderProvider}.
- */
- public CoderProvider getFallbackCoderProvider() {
- return fallbackCoderProvider;
- }
-
/////////////////////////////////////////////////////////////////////////////
/**
* Returns a {@code Map} from each of {@code baseClass}'s type parameters to the {@link Coder} to
- * use by default for it, in the context of {@code subClass}'s specialization of
- * {@code baseClass}.
+ * use for it, in the context of {@code subClass}'s specialization of {@code baseClass}.
*
* <p>If no {@link Coder} can be inferred for a particular type parameter, then that type variable
* will be absent from the returned {@code Map}.
@@ -465,7 +322,7 @@ public class CoderRegistry implements CoderProvider {
* <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a
* {@link Coder} for a particular type variable cannot be inferred, but merely omits the entry
* from the returned {@code Map}. It is the responsibility of the caller (usually
- * {@link #getDefaultCoder} to extract the desired coder or throw a
+ * {@link #getCoderFromTypeDescriptor} to extract the desired coder or throw a
* {@link CannotProvideCoderException} when appropriate.
*
* @param subClass the concrete type whose specializations are being inferred
@@ -495,8 +352,7 @@ public class CoderRegistry implements CoderProvider {
/**
* Returns an array listing, for each of {@code baseClass}'s type parameters, the {@link Coder} to
- * use by default for it, in the context of {@code subClass}'s specialization of
- * {@code baseClass}.
+ * use for it, in the context of {@code subClass}'s specialization of {@code baseClass}.
*
* <p>If a {@link Coder} cannot be inferred for a type variable, its slot in the resulting array
* will be {@code null}.
@@ -518,7 +374,7 @@ public class CoderRegistry implements CoderProvider {
* <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a
* {@link Coder} for a particular type variable cannot be inferred. Instead, it results in a
* {@code null} in the array. It is the responsibility of the caller (usually
- * {@link #getDefaultCoder} to extract the desired coder or throw a
+ * {@link #getCoderFromTypeDescriptor} to extract the desired coder or throw a
* {@link CannotProvideCoderException} when appropriate.
*
* @param subClass the concrete type whose specializations are being inferred
@@ -571,7 +427,7 @@ public class CoderRegistry implements CoderProvider {
result[i] = knownCoders[i];
} else {
try {
- result[i] = getDefaultCoder(typeArgs[i], context);
+ result[i] = getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgs[i]), context);
} catch (CannotProvideCoderException exc) {
result[i] = null;
}
@@ -695,89 +551,31 @@ public class CoderRegistry implements CoderProvider {
}
/**
- * The map of classes to the CoderFactories to use to create their
- * default Coders.
+ * The list of {@link CoderProvider coder providers} to use to provide Coders.
*/
- private Map<Class<?>, CoderFactory> coderFactoryMap;
-
- /**
- * A provider of coders for types where no coder is registered.
- */
- private CoderProvider fallbackCoderProvider;
-
- /**
- * Returns the {@link CoderFactory} to use to create default {@link Coder Coders} for instances of
- * the given class, or {@code null} if there is no default {@link CoderFactory} registered.
- */
- private CoderFactory getDefaultCoderFactory(Class<?> clazz) throws CannotProvideCoderException {
- CoderFactory coderFactoryOrNull = coderFactoryMap.get(clazz);
- if (coderFactoryOrNull != null) {
- return coderFactoryOrNull;
- } else {
- throw new CannotProvideCoderException(
- String.format("Cannot provide coder based on value with class %s: No CoderFactory has "
- + "been registered for the class.", clazz.getCanonicalName()));
- }
- }
-
- /**
- * Returns the {@link Coder} returned according to the {@link CoderProvider} from any
- * {@link DefaultCoder} annotation on the given class.
- */
- private <T> Coder<T> getDefaultCoderFromAnnotation(Class<T> clazz)
- throws CannotProvideCoderException {
- DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
- if (defaultAnnotation == null) {
- throw new CannotProvideCoderException(
- String.format("Class %s does not have a @DefaultCoder annotation.",
- clazz.getCanonicalName()));
- }
-
- LOG.debug("DefaultCoder annotation found for {} with value {}",
- clazz, defaultAnnotation.value());
- CoderProvider coderProvider = CoderProviders.fromStaticMethods(defaultAnnotation.value());
- return coderProvider.getCoder(TypeDescriptor.of(clazz));
- }
-
- /**
- * Returns the {@link Coder} to use by default for values of the given type,
- * in a context where the given types use the given coders.
- *
- * @throws CannotProvideCoderException if a coder cannot be provided
- */
- private <T> Coder<T> getDefaultCoder(
- TypeDescriptor<T> typeDescriptor,
- Map<Type, Coder<?>> typeCoderBindings)
- throws CannotProvideCoderException {
-
- Coder<?> defaultCoder = getDefaultCoder(typeDescriptor.getType(), typeCoderBindings);
- LOG.debug("Default coder for {}: {}", typeDescriptor, defaultCoder);
- @SuppressWarnings("unchecked")
- Coder<T> result = (Coder<T>) defaultCoder;
- return result;
- }
+ private LinkedList<CoderProvider> coderProviders;
/**
- * Returns the {@link Coder} to use by default for values of the given type,
+ * Returns a {@link Coder} to use for values of the given type,
* in a context where the given types use the given coders.
*
* @throws CannotProvideCoderException if a coder cannot be provided
*/
- private Coder<?> getDefaultCoder(Type type, Map<Type, Coder<?>> typeCoderBindings)
+ private <T> Coder<T> getCoderFromTypeDescriptor(
+ TypeDescriptor<T> typeDescriptor, Map<Type, Coder<?>> typeCoderBindings)
throws CannotProvideCoderException {
- Coder<?> coder = typeCoderBindings.get(type);
- if (coder != null) {
- return coder;
- }
- if (type instanceof Class<?>) {
- Class<?> clazz = (Class<?>) type;
- return getDefaultCoder(clazz);
+ Type type = typeDescriptor.getType();
+ Coder<?> coder;
+ if (typeCoderBindings.containsKey(type)) {
+ coder = typeCoderBindings.get(type);
+ } else if (type instanceof Class<?>) {
+ coder = getCoderFromFactories(typeDescriptor, Collections.<Coder<?>>emptyList());
} else if (type instanceof ParameterizedType) {
- return getDefaultCoder((ParameterizedType) type, typeCoderBindings);
+ coder = getCoderFromParameterizedType((ParameterizedType) type, typeCoderBindings);
} else if (type instanceof TypeVariable) {
- return getDefaultCoder(TypeDescriptor.of(type).getRawType());
+ coder = getCoderFromFactories(typeDescriptor, Collections.<Coder<?>>emptyList());
} else if (type instanceof WildcardType) {
- // No default coder for an unknown generic type.
+ // No coder for an unknown generic type.
throw new CannotProvideCoderException(
String.format("Cannot provide a coder for type variable %s"
+ " (declared by %s) because the actual type is unknown due to erasure.",
@@ -788,72 +586,70 @@ public class CoderRegistry implements CoderProvider {
throw new RuntimeException(
"Internal error: unexpected kind of Type: " + type);
}
+
+ LOG.debug("Coder for {}: {}", typeDescriptor, coder);
+ @SuppressWarnings("unchecked")
+ Coder<T> result = (Coder<T>) coder;
+ return result;
}
/**
- * Returns the {@link Coder} to use by default for values of the given
+ * Returns a {@link Coder} to use for values of the given
* parameterized type, in a context where the given types use the
* given {@link Coder Coders}.
*
* @throws CannotProvideCoderException if no coder can be provided
*/
- private Coder<?> getDefaultCoder(
+ private Coder<?> getCoderFromParameterizedType(
ParameterizedType type,
Map<Type, Coder<?>> typeCoderBindings)
throws CannotProvideCoderException {
- CannotProvideCoderException factoryException;
- try {
- return getDefaultCoderFromFactory(type, typeCoderBindings);
- } catch (CannotProvideCoderException exc) {
- factoryException = exc;
- }
-
- CannotProvideCoderException annotationException;
- try {
- Class<?> rawClazz = (Class<?>) type.getRawType();
- return getDefaultCoderFromAnnotation(rawClazz);
- } catch (CannotProvideCoderException exc) {
- annotationException = exc;
- }
-
- // Build up the error message and list of causes.
- StringBuilder messageBuilder = new StringBuilder()
- .append("Unable to provide a default Coder for ").append(type)
- .append(". Correct one of the following root causes:");
-
- messageBuilder
- .append("\n Building a Coder using a registered CoderFactory failed: ")
- .append(factoryException.getMessage());
-
- messageBuilder
- .append("\n Building a Coder from the @DefaultCoder annotation failed: ")
- .append(annotationException.getMessage());
-
- throw new CannotProvideCoderException(messageBuilder.toString());
- }
-
- private Coder<?> getDefaultCoderFromFactory(
- ParameterizedType type,
- Map<Type, Coder<?>> typeCoderBindings)
- throws CannotProvideCoderException {
- Class<?> rawClazz = (Class<?>) type.getRawType();
- CoderFactory coderFactory = getDefaultCoderFactory(rawClazz);
List<Coder<?>> typeArgumentCoders = new ArrayList<>();
for (Type typeArgument : type.getActualTypeArguments()) {
try {
- Coder<?> typeArgumentCoder = getDefaultCoder(typeArgument,
- typeCoderBindings);
+ Coder<?> typeArgumentCoder =
+ getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings);
typeArgumentCoders.add(typeArgumentCoder);
} catch (CannotProvideCoderException exc) {
- throw new CannotProvideCoderException(
- String.format("Cannot provide coder for parameterized type %s: %s",
- type,
- exc.getMessage()),
- exc);
+ throw new CannotProvideCoderException(
+ String.format("Cannot provide coder for parameterized type %s: %s",
+ type,
+ exc.getMessage()),
+ exc);
}
}
- return coderFactory.create(typeArgumentCoders);
+ return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders);
+ }
+
+ /**
+ * Attempts to create a {@link Coder} from any registered {@link CoderProvider} returning
+ * the first successfully created instance.
+ */
+ private Coder<?> getCoderFromFactories(
+ TypeDescriptor<?> typeDescriptor, List<Coder<?>> typeArgumentCoders)
+ throws CannotProvideCoderException {
+ List<CannotProvideCoderException> suppressedExceptions = new ArrayList<>();
+ for (CoderProvider coderProvider : coderProviders) {
+ try {
+ return coderProvider.coderFor(typeDescriptor, typeArgumentCoders);
+ } catch (CannotProvideCoderException e) {
+ // Add all failures as suppressed exceptions.
+ suppressedExceptions.add(e);
+ }
+ }
+
+ // Build up the error message and list of causes.
+ StringBuilder messageBuilder = new StringBuilder()
+ .append("Unable to provide a Coder for ").append(typeDescriptor).append(".\n")
+ .append(" Building a Coder using a registered CoderProvider failed.\n")
+ .append(" See suppressed exceptions for detailed failures.");
+ CannotProvideCoderException exceptionOnFailure =
+ new CannotProvideCoderException(messageBuilder.toString());
+ for (CannotProvideCoderException suppressedException : suppressedExceptions) {
+ exceptionOnFailure.addSuppressed(suppressedException);
+ }
+ throw exceptionOnFailure;
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
index 523b69b..1762308 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
@@ -46,15 +46,6 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
return decodedElements;
}
- /**
- * Returns the first element in this collection if it is non-empty,
- * otherwise returns {@code null}.
- */
- public static <T> List<Object> getInstanceComponents(
- Collection<T> exampleValue) {
- return getInstanceComponentsHelper(exampleValue);
- }
-
protected CollectionCoder(Coder<T> elemCoder) {
super(elemCoder, "Collection");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index f33e210..edbaa7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -94,14 +94,6 @@ public abstract class CustomCoder<T> extends Coder<T>
}
/**
- * Returns an empty list. A {@link CustomCoder} by default will not have component coders that are
- * used for inference.
- */
- public static <T> List<Object> getInstanceComponents(T exampleValue) {
- return Collections.emptyList();
- }
-
- /**
* {@inheritDoc}
*
* @throws NonDeterministicException a {@link CustomCoder} is presumed
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
index 9a976f9..6eff9e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
@@ -17,45 +17,33 @@
*/
package org.apache.beam.sdk.coders;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The {@link DefaultCoder} annotation
- * specifies a default {@link Coder} class to handle encoding and decoding
- * instances of the annotated class.
+ * The {@link DefaultCoder} annotation specifies a {@link Coder} class to handle encoding and
+ * decoding instances of the annotated class.
*
- * <p>The specified {@link Coder} must satisfy the requirements of
- * {@link CoderProviders#fromStaticMethods}. Two classes provided by the SDK that
- * are intended for use with this annotation include {@link SerializableCoder}
- * and {@link AvroCoder}.
+ * <p>The specified {@link Coder} must have the following method:
+ * <pre>
+ * {@code public static CoderProvider getCoderProvider()}.
+ * </pre>
*
- * <p>To configure the use of Java serialization as the default
- * for a class, annotate the class to use
- * {@link SerializableCoder} as follows:
- *
- * <pre><code>{@literal @}DefaultCoder(SerializableCoder.class)
- * public class MyCustomDataType implements Serializable {
- * // ...
- * }</code></pre>
- *
- * <p>Similarly, to configure the use of
- * {@link AvroCoder} as the default:
- * <pre><code>{@literal @}DefaultCoder(AvroCoder.class)
- * public class MyCustomDataType {
- * public MyCustomDataType() {} // Avro requires an empty constructor.
- * // ...
- * }</code></pre>
- *
- * <p>Coders specified explicitly via
- * {@link PCollection#setCoder}
- * take precedence, followed by Coders registered at runtime via
- * {@link CoderRegistry#registerCoder}. See {@link CoderRegistry} for a more detailed discussion
- * of the precedence rules.
+ * <p>Coders specified explicitly via {@link PCollection#setCoder} take precedence, followed by
+ * Coders found at runtime via {@link CoderRegistry#getCoder}.
+ * See {@link CoderRegistry} for a more detailed discussion of the precedence rules.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@@ -63,4 +51,77 @@ import org.apache.beam.sdk.values.PCollection;
@SuppressWarnings("rawtypes")
public @interface DefaultCoder {
Class<? extends Coder> value();
+
+ /**
+ * A {@link CoderProviderRegistrar} that registers a {@link CoderProvider} which can use
+ * the {@code @DefaultCoder} annotation to provide {@link CoderProvider coder providers} that
+ * creates {@link Coder}s.
+ */
+ @AutoService(CoderProviderRegistrar.class)
+ class DefaultCoderProviderRegistrar implements CoderProviderRegistrar {
+
+ @Override
+ public List<CoderProvider> getCoderProviders() {
+ return ImmutableList.<CoderProvider>of(new DefaultCoderProvider());
+ }
+
+ /**
+ * A {@link CoderProvider} that uses the {@code @DefaultCoder} annotation to provide
+ * {@link CoderProvider coder providers} that create {@link Coder}s.
+ */
+ static class DefaultCoderProvider extends CoderProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultCoderProvider.class);
+
+ /**
+ * Returns the {@link Coder} returned according to the {@link CoderProvider} from any
+ * {@link DefaultCoder} annotation on the given class.
+ */
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+
+ Class<?> clazz = typeDescriptor.getRawType();
+ DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
+ if (defaultAnnotation == null) {
+ throw new CannotProvideCoderException(
+ String.format("Class %s does not have a @DefaultCoder annotation.",
+ clazz.getName()));
+ }
+
+ if (defaultAnnotation.value() == null) {
+ throw new CannotProvideCoderException(
+ String.format("Class %s has a @DefaultCoder annotation with a null value.",
+ clazz.getName()));
+ }
+
+ LOG.debug("DefaultCoder annotation found for {} with value {}",
+ clazz, defaultAnnotation.value());
+
+ Method coderProviderMethod;
+ try {
+ coderProviderMethod = defaultAnnotation.value().getMethod("getCoderProvider");
+ } catch (NoSuchMethodException e) {
+ throw new CannotProvideCoderException(String.format(
+ "Unable to find 'public static CoderProvider getCoderProvider()' on %s",
+ defaultAnnotation.value()),
+ e);
+ }
+
+ CoderProvider coderProvider;
+ try {
+ coderProvider = (CoderProvider) coderProviderMethod.invoke(null);
+ } catch (IllegalAccessException
+ | IllegalArgumentException
+ | InvocationTargetException
+ | NullPointerException
+ | ExceptionInInitializerError e) {
+ throw new CannotProvideCoderException(String.format(
+ "Unable to invoke 'public static CoderProvider getCoderProvider()' on %s",
+ defaultAnnotation.value()),
+ e);
+ }
+ return coderProvider.coderFor(typeDescriptor, componentCoders);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index 02c3d0f..b600b1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -41,15 +41,6 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
return decodedElements;
}
- /**
- * Returns the first element in this iterable if it is non-empty,
- * otherwise returns {@code null}.
- */
- public static <T> List<Object> getInstanceComponents(
- Iterable<T> exampleValue) {
- return getInstanceComponentsHelper(exampleValue);
- }
-
protected IterableCoder(Coder<T> elemCoder) {
super(elemCoder, "Iterable");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 8e10ca2..52b9c34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -75,18 +75,6 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
private final Coder<T> elementCoder;
private final String iterableName;
- /**
- * Returns the first element in the iterable-like {@code exampleValue} if it is non-empty,
- * otherwise returns {@code null}.
- */
- protected static <T, IterableT extends Iterable<T>>
- List<Object> getInstanceComponentsHelper(IterableT exampleValue) {
- for (T value : exampleValue) {
- return Arrays.<Object>asList(value);
- }
- return null;
- }
-
protected IterableLikeCoder(Coder<T> elementCoder, String iterableName) {
checkArgument(elementCoder != null, "element Coder for IterableLikeCoder must not be null");
checkArgument(iterableName != null, "iterable name for IterableLikeCoder must not be null");
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 35b7449..da7f03c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -39,13 +39,6 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
return new KvCoder<>(keyCoder, valueCoder);
}
- public static <K, V> List<Object> getInstanceComponents(
- KV<K, V> exampleValue) {
- return Arrays.asList(
- exampleValue.getKey(),
- exampleValue.getValue());
- }
-
public Coder<K> getKeyCoder() {
return keyCoder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 70bbf93..25f3ee9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -40,14 +40,6 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
return decodedElements;
}
- /**
- * Returns the first element in this list if it is non-empty,
- * otherwise returns {@code null}.
- */
- public static <T> List<Object> getInstanceComponents(List<T> exampleValue) {
- return getInstanceComponentsHelper(exampleValue);
- }
-
protected ListCoder(Coder<T> elemCoder) {
super(elemCoder, "List");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index da2bf50..9e3c768 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -50,18 +50,6 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
return new MapCoder<>(keyCoder, valueCoder);
}
- /**
- * Returns the key and value for an arbitrary element of this map,
- * if it is non-empty, otherwise returns {@code null}.
- */
- public static <K, V> List<Object> getInstanceComponents(
- Map<K, V> exampleValue) {
- for (Map.Entry<K, V> entry : exampleValue.entrySet()) {
- return Arrays.asList(entry.getKey(), entry.getValue());
- }
- return null;
- }
-
public Coder<K> getKeyCoder() {
return keyCoder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index b52b9db..e3b2959 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -17,12 +17,15 @@
*/
package org.apache.beam.sdk.coders;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.List;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
@@ -63,29 +66,45 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
}
/**
- * A {@link CoderProvider} that constructs a {@link SerializableCoder}
- * for any class that implements serializable.
+ * Returns a {@link CoderProvider} which uses the {@link SerializableCoder} if possible for
+ * all types.
+ *
+ * <p>This method is invoked reflectively from {@link DefaultCoder}.
*/
- public static final CoderProvider PROVIDER = new CoderProvider() {
+ @SuppressWarnings("unused")
+ public static CoderProvider getCoderProvider() {
+ return new SerializableCoderProvider();
+ }
+
+ /**
+ * A {@link CoderProviderRegistrar} which registers a {@link CoderProvider} which can handle
+ * serializable types.
+ */
+ @AutoService(CoderProviderRegistrar.class)
+ public static class SerializableCoderProviderRegistrar implements CoderProviderRegistrar {
+
@Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
- throws CannotProvideCoderException {
- Class<?> clazz = typeDescriptor.getRawType();
- if (Serializable.class.isAssignableFrom(clazz)) {
- @SuppressWarnings("unchecked")
- Class<? extends Serializable> serializableClazz =
- (Class<? extends Serializable>) clazz;
- @SuppressWarnings("unchecked")
- Coder<T> coder = (Coder<T>) SerializableCoder.of(serializableClazz);
- return coder;
- } else {
- throw new CannotProvideCoderException(
- "Cannot provide SerializableCoder because " + typeDescriptor
- + " does not implement Serializable");
- }
+ public List<CoderProvider> getCoderProviders() {
+ return ImmutableList.of(getCoderProvider());
}
- };
+ }
+ /**
+ * A {@link CoderProvider} that constructs a {@link SerializableCoder} for any class that
+ * implements serializable.
+ */
+ static class SerializableCoderProvider extends CoderProvider {
+ @Override
+ public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
+ List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
+ if (Serializable.class.isAssignableFrom(typeDescriptor.getRawType())) {
+ return SerializableCoder.of((TypeDescriptor) typeDescriptor);
+ }
+ throw new CannotProvideCoderException(
+ "Cannot provide SerializableCoder because " + typeDescriptor
+ + " does not implement Serializable");
+ }
+ }
private final Class<T> type;
private final TypeDescriptor<T> typeDescriptor;
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index da16165..baec128 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -56,15 +56,6 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor());
}
- /**
- * Returns the first element in this set if it is non-empty,
- * otherwise returns {@code null}.
- */
- public static <T> List<Object> getInstanceComponents(
- Set<T> exampleValue) {
- return getInstanceComponentsHelper(exampleValue);
- }
-
/////////////////////////////////////////////////////////////////////////////
// Internal operations below here.
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index 7fc094f..9a7b125 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -37,13 +37,6 @@ public class VarLongCoder extends StructuredCoder<Long> {
return INSTANCE;
}
- /**
- * Returns an empty list. {@link VarLongCoder} has no components.
- */
- public static <T> List<Object> getInstanceComponents(T ignored) {
- return Collections.emptyList();
- }
-
/////////////////////////////////////////////////////////////////////////////
private static final VarLongCoder INSTANCE = new VarLongCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
index 29990cd..20ec300 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
@@ -125,14 +125,14 @@ public class CombineFnBase {
@Override
public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
+ return registry.getCoder(getClass(), AbstractGlobalCombineFn.class,
ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder), getAccumTVariable());
}
@Override
public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputT> inputCoder)
throws CannotProvideCoderException {
- return registry.getDefaultCoder(getClass(), AbstractGlobalCombineFn.class,
+ return registry.getCoder(getClass(), AbstractGlobalCombineFn.class,
ImmutableMap.<Type, Coder<?>>of(getInputTVariable(), inputCoder, getAccumTVariable(),
this.getAccumulatorCoder(registry, inputCoder)),
getOutputTVariable());
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index d4c97bc..0515ed5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -339,7 +339,7 @@ public class CombineFns {
List<Coder<Object>> coders = Lists.newArrayList();
for (int i = 0; i < combineFnCount; ++i) {
Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
+ registry.getOutputCoder(extractInputFns.get(i), dataCoder);
coders.add(combineFns.get(i).getAccumulatorCoder(registry, inputCoder));
}
return new ComposedAccumulatorCoder(coders);
@@ -478,7 +478,7 @@ public class CombineFns {
List<Coder<Object>> coders = Lists.newArrayList();
for (int i = 0; i < combineFnCount; ++i) {
Coder<Object> inputCoder =
- registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
+ registry.getOutputCoder(extractInputFns.get(i), dataCoder);
coders.add(combineFnWithContexts.get(i).getAccumulatorCoder(registry, inputCoder));
}
return new ComposedAccumulatorCoder(coders);
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 5624f2f..7af8fb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -27,17 +27,25 @@ import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CollectionCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.OffsetBasedSource;
@@ -317,7 +325,7 @@ public class Create<T> {
if (coder.isPresent()) {
return coder.get();
} else if (typeDescriptor.isPresent()) {
- return input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get());
+ return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
} else {
return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems);
}
@@ -566,7 +574,7 @@ public class Create<T> {
if (elementCoder.isPresent()) {
return elementCoder.get();
} else if (typeDescriptor.isPresent()) {
- return input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get());
+ return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
} else {
Iterable<T> rawElements =
Iterables.transform(
@@ -611,7 +619,7 @@ public class Create<T> {
if (elementClazz.getTypeParameters().length == 0) {
try {
@SuppressWarnings("unchecked") // elementClazz is a wildcard type
- Coder<T> coder = (Coder<T>) registry.getDefaultCoder(TypeDescriptor.of(elementClazz));
+ Coder<T> coder = (Coder<T>) registry.getCoder(TypeDescriptor.of(elementClazz));
return coder;
} catch (CannotProvideCoderException exc) {
// Can't get a coder from the class of the elements, try with the elements next
@@ -619,11 +627,20 @@ public class Create<T> {
}
// If that fails, try to deduce a coder using the elements themselves
- Optional<Coder<T>> coder = Optional.absent();
- for (T elem : elems) {
- Coder<T> c = registry.getDefaultCoder(elem);
+ return (Coder<T>) inferCoderFromObjects(registry, elems);
+ }
+
+ /**
+ * Attempts to infer the {@link Coder} of the elements ensuring that the returned coder is
+ * equivalent for all elements.
+ */
+ private static Coder<?> inferCoderFromObjects(
+ CoderRegistry registry, Iterable<?> elems) throws CannotProvideCoderException {
+ Optional<Coder<?>> coder = Optional.absent();
+ for (Object elem : elems) {
+ Coder<?> c = inferCoderFromObject(registry, elem);
if (!coder.isPresent()) {
- coder = Optional.of(c);
+ coder = (Optional) Optional.of(c);
} else if (!Objects.equals(c, coder.get())) {
throw new CannotProvideCoderException(
"Cannot provide coder for elements of "
@@ -633,11 +650,48 @@ public class Create<T> {
+ " Based on their values, they do not all default to the same Coder.");
}
}
+ if (coder.isPresent()) {
+ return coder.get();
+ }
- if (!coder.isPresent()) {
- throw new CannotProvideCoderException(
- "Unable to infer a coder. Please register " + "a coder for ");
+ throw new CannotProvideCoderException("Cannot provide coder for elements of "
+ + Create.class.getSimpleName()
+ + ":"
+ + " For their common class, no coder could be provided."
+ + " Based on their values, no coder could be inferred.");
+ }
+
+ /**
+ * Attempt to infer the type for some very common Apache Beam parameterized types.
+ *
+ * <p>TODO: Instead, build a TypeDescriptor so that the {@link CoderRegistry} is invoked
+ * for the type instead of hard coding the coders for common types.
+ */
+ private static Coder<?> inferCoderFromObject(CoderRegistry registry, Object o)
+ throws CannotProvideCoderException {
+ if (o == null) {
+ return VoidCoder.of();
+ } else if (o instanceof TimestampedValue) {
+ return TimestampedValueCoder.of(
+ inferCoderFromObject(registry, ((TimestampedValue) o).getValue()));
+ } else if (o instanceof List) {
+ return ListCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+ } else if (o instanceof Set) {
+ return SetCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+ } else if (o instanceof Collection) {
+ return CollectionCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+ } else if (o instanceof Iterable) {
+ return IterableCoder.of(inferCoderFromObjects(registry, ((Iterable) o)));
+ } else if (o instanceof Map) {
+ return MapCoder.of(
+ inferCoderFromObjects(registry, ((Map) o).keySet()),
+ inferCoderFromObjects(registry, ((Map) o).entrySet()));
+ } else if (o instanceof KV) {
+ return KvCoder.of(
+ inferCoderFromObject(registry, ((KV) o).getKey()),
+ inferCoderFromObject(registry, ((KV) o).getValue()));
+ } else {
+ return registry.getCoder(o.getClass());
}
- return coder.get();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 15abd98..d5df944 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -117,7 +117,7 @@ import org.apache.beam.sdk.values.TupleTag;
* mapping from Java types to the default Coder to use, for a standard
* set of Java types; users can extend this mapping for additional
* types, via
- * {@link org.apache.beam.sdk.coders.CoderRegistry#registerCoder}.
+ * {@link org.apache.beam.sdk.coders.CoderRegistry#registerCoderProvider}.
* If this inference process fails, either because the Java type was
* not known at run-time (e.g., due to Java's "erasure" of generic
* types) or there was no default Coder registered, then the Coder
@@ -281,7 +281,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
* @throws CannotProvideCoderException if no coder can be inferred
*/
protected Coder<?> getDefaultOutputCoder() throws CannotProvideCoderException {
- throw new CannotProvideCoderException("PTransform.getDefaultOutputCoder called.");
+ throw new CannotProvideCoderException("PTransform.getOutputCoder called.");
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index e67dbe1..edf1419 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -477,10 +477,10 @@ public class ParDo {
Type typeArgument = typeArguments[i];
TypeDescriptor<?> typeDescriptor = TypeDescriptor.of(typeArgument);
try {
- coders[i] = coderRegistry.getDefaultCoder(typeDescriptor);
+ coders[i] = coderRegistry.getCoder(typeDescriptor);
} catch (CannotProvideCoderException e) {
try {
- coders[i] = coderRegistry.getDefaultCoder(
+ coders[i] = coderRegistry.getCoder(
typeDescriptor, inputCoder.getEncodedTypeDescriptor(), inputCoder);
} catch (CannotProvideCoderException ignored) {
// Since not all type arguments will have a registered coder we ignore this exception.
@@ -623,7 +623,7 @@ public class ParDo {
@SuppressWarnings("unchecked")
protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
throws CannotProvideCoderException {
- return input.getPipeline().getCoderRegistry().getDefaultCoder(
+ return input.getPipeline().getCoderRegistry().getCoder(
getFn().getOutputTypeDescriptor(),
getFn().getInputTypeDescriptor(),
((PCollection<InputT>) input).getCoder());
@@ -767,7 +767,7 @@ public class ParDo {
throws CannotProvideCoderException {
@SuppressWarnings("unchecked")
Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
- return input.getPipeline().getCoderRegistry().getDefaultCoder(
+ return input.getPipeline().getCoderRegistry().getCoder(
output.getTypeDescriptor(),
getFn().getInputTypeDescriptor(),
inputCoder);
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index dd38006..c66d1b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -124,9 +124,9 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
Coder<K> keyCoder;
CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
if (keyClass == null) {
- keyCoder = coderRegistry.getDefaultOutputCoder(fn, in.getCoder());
+ keyCoder = coderRegistry.getOutputCoder(fn, in.getCoder());
} else {
- keyCoder = coderRegistry.getDefaultCoder(TypeDescriptor.of(keyClass));
+ keyCoder = coderRegistry.getCoder(TypeDescriptor.of(keyClass));
}
// TODO: Remove when we can set the coder inference context.
result.setCoder(KvCoder.of(keyCoder, in.getCoder()));
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 0276ba6..0bfb875 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -87,12 +87,6 @@ public class GlobalWindow extends BoundedWindow {
return Collections.emptyList();
}
- /**
- * Returns an empty list. The Global Window Coder has no components.
- */
- public static <T> List<Object> getInstanceComponents(T exampleValue) {
- return Collections.emptyList();
- }
private Coder() {}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index fd2a2d8..46ece09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -178,13 +178,6 @@ public class IntervalWindow extends BoundedWindow
return INSTANCE;
}
- /**
- * Returns an empty list. {@link IntervalWindowCoder} has no components.
- */
- public static <T> List<Object> getInstanceComponents(T value) {
- return Collections.emptyList();
- }
-
@Override
public void encode(IntervalWindow window, OutputStream outStream, Context context)
throws IOException, CoderException {
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 1095fb8..f210fd8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -142,7 +142,7 @@ public class PCollection<T> extends PValueBase implements PValue {
CannotProvideCoderException inferFromTokenException = null;
if (token != null) {
try {
- return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
+ return new CoderOrFailure<>(registry.getCoder(token), null);
} catch (CannotProvideCoderException exc) {
inferFromTokenException = exc;
// Attempt to detect when the token came from a TupleTag used for a ParDo output,
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index a9f3929..c172885 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -132,10 +132,6 @@ public class TimestampedValue<V> {
return valueCoder;
}
- public static <T> List<Object> getInstanceComponents(TimestampedValue<T> exampleValue) {
- return Arrays.<Object>asList(exampleValue.getValue());
- }
-
@Override
public TypeDescriptor<TimestampedValue<T>> getEncodedTypeDescriptor() {
return new TypeDescriptor<TimestampedValue<T>>() {}.where(
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
deleted file mode 100644
index 4ffc9c1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderFactoriesTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Collections;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link CoderFactories}.
- */
-@RunWith(JUnit4.class)
-public class CoderFactoriesTest {
-
- /**
- * Ensures that a few of our standard atomic coder classes
- * can each be built into a factory that works as expected.
- * It is presumed that testing a few, not all, suffices to
- * exercise CoderFactoryFromStaticMethods.
- */
- @Test
- public void testAtomicCoderClassFactories() {
- checkAtomicCoderFactory(StringUtf8Coder.class, StringUtf8Coder.of());
- checkAtomicCoderFactory(DoubleCoder.class, DoubleCoder.of());
- checkAtomicCoderFactory(ByteArrayCoder.class, ByteArrayCoder.of());
- }
-
- /**
- * Checks that {#link CoderFactories.fromStaticMethods} successfully
- * builds a working {@link CoderFactory} from {@link KvCoder KvCoder.class}.
- */
- @Test
- public void testKvCoderFactory() {
- CoderFactory kvCoderFactory = CoderFactories.fromStaticMethods(KvCoder.class);
- assertEquals(
- KvCoder.of(DoubleCoder.of(), DoubleCoder.of()),
- kvCoderFactory.create(Arrays.asList(DoubleCoder.of(), DoubleCoder.of())));
- }
-
- /**
- * Checks that {#link CoderFactories.fromStaticMethods} successfully
- * builds a working {@link CoderFactory} from {@link ListCoder ListCoder.class}.
- */
- @Test
- public void testListCoderFactory() {
- CoderFactory listCoderFactory = CoderFactories.fromStaticMethods(ListCoder.class);
-
- assertEquals(
- ListCoder.of(DoubleCoder.of()),
- listCoderFactory.create(Arrays.asList(DoubleCoder.of())));
- }
-
- /**
- * Checks that {#link CoderFactories.fromStaticMethods} successfully
- * builds a working {@link CoderFactory} from {@link IterableCoder IterableCoder.class}.
- */
- @Test
- public void testIterableCoderFactory() {
- CoderFactory iterableCoderFactory = CoderFactories.fromStaticMethods(IterableCoder.class);
-
- assertEquals(
- IterableCoder.of(DoubleCoder.of()),
- iterableCoderFactory.create(Arrays.asList(DoubleCoder.of())));
- }
-
- ///////////////////////////////////////////////////////////////////////
-
- /**
- * Checks that an atomic coder class can be converted into
- * a factory that then yields a coder equal to the example
- * provided.
- */
- private <T> void checkAtomicCoderFactory(
- Class<? extends Coder<T>> coderClazz,
- Coder<T> expectedCoder) {
- CoderFactory factory = CoderFactories.fromStaticMethods(coderClazz);
- @SuppressWarnings("unchecked")
- Coder<T> actualCoder = (Coder<T>) factory.create(Collections.<Coder<?>>emptyList());
- assertEquals(expectedCoder, actualCoder);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f8e2cf89/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
index 44be56d..2aa8351 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderProvidersTest.java
@@ -17,54 +17,78 @@
*/
package org.apache.beam.sdk.coders;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Rule;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests for {@link CoderFactories}.
+ * Tests for {@link CoderProviders}.
*/
@RunWith(JUnit4.class)
public class CoderProvidersTest {
+ @Test
+ public void testCoderProvidersFromStaticMethodsForParameterlessTypes() throws Exception {
+ CoderProvider factory = CoderProviders.fromStaticMethods(String.class, StringUtf8Coder.class);
+ assertEquals(StringUtf8Coder.of(),
+ factory.coderFor(TypeDescriptors.strings(), Collections.<Coder<?>>emptyList()));
- @Rule
- public ExpectedException thrown = ExpectedException.none();
+ factory = CoderProviders.fromStaticMethods(Double.class, DoubleCoder.class);
+ assertEquals(DoubleCoder.of(),
+ factory.coderFor(TypeDescriptors.doubles(), Collections.<Coder<?>>emptyList()));
- @Test
- public void testAvroThenSerializableStringMap() throws Exception {
- CoderProvider provider = CoderProviders.firstOf(AvroCoder.PROVIDER, SerializableCoder.PROVIDER);
- Coder<Map<String, String>> coder =
- provider.getCoder(new TypeDescriptor<Map<String, String>>(){});
- assertThat(coder, instanceOf(AvroCoder.class));
+ factory = CoderProviders.fromStaticMethods(byte[].class, ByteArrayCoder.class);
+ assertEquals(ByteArrayCoder.of(),
+ factory.coderFor(TypeDescriptor.of(byte[].class), Collections.<Coder<?>>emptyList()));
}
+ /**
+ * Checks that {#link CoderProviders.fromStaticMethods} successfully
+ * builds a working {@link CoderProvider} from {@link KvCoder KvCoder.class}.
+ */
@Test
- public void testThrowingThenSerializable() throws Exception {
- CoderProvider provider =
- CoderProviders.firstOf(new ThrowingCoderProvider(), SerializableCoder.PROVIDER);
- Coder<Integer> coder = provider.getCoder(new TypeDescriptor<Integer>(){});
- assertThat(coder, instanceOf(SerializableCoder.class));
+ public void testKvCoderProvider() throws Exception {
+ TypeDescriptor<KV<Double, Double>> type =
+ TypeDescriptors.kvs(TypeDescriptors.doubles(), TypeDescriptors.doubles());
+ CoderProvider kvCoderProvider = CoderProviders.fromStaticMethods(KV.class, KvCoder.class);
+ assertEquals(
+ KvCoder.of(DoubleCoder.of(), DoubleCoder.of()),
+ kvCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of(), DoubleCoder.of())));
}
+ /**
+ * Checks that {#link CoderProviders.fromStaticMethods} successfully
+ * builds a working {@link CoderProvider} from {@link ListCoder ListCoder.class}.
+ */
@Test
- public void testNullThrows() throws Exception {
- CoderProvider provider = CoderProviders.firstOf(new ThrowingCoderProvider());
- thrown.expect(CannotProvideCoderException.class);
- thrown.expectMessage("ThrowingCoderProvider");
- provider.getCoder(new TypeDescriptor<Integer>(){});
+ public void testListCoderProvider() throws Exception {
+ TypeDescriptor<List<Double>> type = TypeDescriptors.lists(TypeDescriptors.doubles());
+ CoderProvider listCoderProvider = CoderProviders.fromStaticMethods(List.class, ListCoder.class);
+
+ assertEquals(
+ ListCoder.of(DoubleCoder.of()),
+ listCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of())));
}
- private static class ThrowingCoderProvider implements CoderProvider {
- @Override
- public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
- throw new CannotProvideCoderException("ThrowingCoderProvider cannot ever provide a Coder");
- }
+ /**
+ * Checks that {#link CoderProviders.fromStaticMethods} successfully
+ * builds a working {@link CoderProvider} from {@link IterableCoder IterableCoder.class}.
+ */
+ @Test
+ public void testIterableCoderProvider() throws Exception {
+ TypeDescriptor<Iterable<Double>> type = TypeDescriptors.iterables(TypeDescriptors.doubles());
+ CoderProvider iterableCoderProvider =
+ CoderProviders.fromStaticMethods(Iterable.class, IterableCoder.class);
+
+ assertEquals(
+ IterableCoder.of(DoubleCoder.of()),
+ iterableCoderProvider.coderFor(type, Arrays.asList(DoubleCoder.of())));
}
}