You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 19:54:07 UTC
[42/50] [abbrv] beam git commit: Add Additional CloudObjectTranslators
Add Additional CloudObjectTranslators
Add IterableLikeCoders, MapCoder
Add UnionCoder, CoGbkResultCoder, and NullableCoder translators.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/73cdd994
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/73cdd994
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/73cdd994
Branch: refs/heads/gearpump-runner
Commit: 73cdd99466bef0c35158d4dd89ac10e9cb056782
Parents: 2d22485
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 1 22:29:34 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 2 18:07:00 2017 -0700
----------------------------------------------------------------------
.../dataflow/util/CloudObjectTranslators.java | 199 +++++++++++++++++++
...aultCoderCloudObjectTranslatorRegistrar.java | 21 +-
.../runners/dataflow/util/CloudObjectsTest.java | 33 ++-
.../beam/sdk/transforms/join/CoGbkResult.java | 8 +
4 files changed, 256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index c27bee7..f3e3312 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.dataflow.util;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
@@ -27,9 +29,15 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
+import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.CloudObject;
@@ -39,6 +47,7 @@ import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.TupleTag;
/** Utilities for creating {@link CloudObjectTranslator} instances for {@link Coder Coders}. */
class CloudObjectTranslators {
@@ -373,4 +382,194 @@ class CloudObjectTranslators {
}
};
}
+ public static CloudObjectTranslator<IterableLikeCoder> iterableLike(
+ final Class<? extends IterableLikeCoder> clazz) {
+ return new CloudObjectTranslator<IterableLikeCoder>() {
+ @Override
+ public CloudObject toCloudObject(IterableLikeCoder target) {
+ CloudObject base = CloudObject.forClass(clazz);
+ return addComponents(base, Collections.<Coder<?>>singletonList(target.getElemCoder()));
+ }
+
+ @Override
+ public IterableLikeCoder<?, ?> fromCloudObject(CloudObject cloudObject) {
+ List<Coder<?>> elemCoderList = getComponents(cloudObject);
+ checkArgument(
+ elemCoderList.size() == 1,
+ "Expected 1 component for %s, got %s",
+ cloudObject.getClassName(),
+ elemCoderList.size());
+ return InstanceBuilder.ofType(clazz)
+ .fromFactoryMethod("of")
+ .withArg(Coder.class, elemCoderList.get(0))
+ .build();
+ }
+
+ @Override
+ public Class<? extends IterableLikeCoder> getSupportedClass() {
+ return clazz;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(clazz).getClassName();
+ }
+ };
+ }
+
+ public static CloudObjectTranslator<MapCoder> map() {
+ return new CloudObjectTranslator<MapCoder>() {
+ @Override
+ public CloudObject toCloudObject(MapCoder target) {
+ CloudObject base = CloudObject.forClass(MapCoder.class);
+ return addComponents(
+ base, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()));
+ }
+
+ @Override
+ public MapCoder<?, ?> fromCloudObject(CloudObject cloudObject) {
+ List<Coder<?>> components = getComponents(cloudObject);
+ checkArgument(
+ components.size() == 2,
+ "Expected 2 components for %s, got %s",
+ MapCoder.class.getSimpleName(),
+ components.size());
+ return MapCoder.of(components.get(0), components.get(1));
+ }
+
+ @Override
+ public Class<? extends MapCoder> getSupportedClass() {
+ return MapCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(MapCoder.class).getClassName();
+ }
+ };
+ }
+
+ public static CloudObjectTranslator<NullableCoder> nullable() {
+ return new CloudObjectTranslator<NullableCoder>() {
+ @Override
+ public CloudObject toCloudObject(NullableCoder target) {
+ CloudObject base = CloudObject.forClass(NullableCoder.class);
+ return addComponents(base, Collections.<Coder<?>>singletonList(target.getValueCoder()));
+ }
+
+ @Override
+ public NullableCoder<?> fromCloudObject(CloudObject cloudObject) {
+ List<Coder<?>> componentList = getComponents(cloudObject);
+ checkArgument(
+ componentList.size() == 1,
+ "Expected 1 component for %s, got %s",
+ NullableCoder.class.getSimpleName(),
+ componentList.size());
+ return NullableCoder.of(componentList.get(0));
+ }
+
+ @Override
+ public Class<? extends NullableCoder> getSupportedClass() {
+ return NullableCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(NullableCoder.class).getClassName();
+ }
+ };
+ }
+
+ public static CloudObjectTranslator<UnionCoder> union() {
+ return new CloudObjectTranslator<UnionCoder>() {
+ @Override
+ public CloudObject toCloudObject(UnionCoder target) {
+ return addComponents(CloudObject.forClass(UnionCoder.class), target.getElementCoders());
+ }
+
+ @Override
+ public UnionCoder fromCloudObject(CloudObject cloudObject) {
+ List<Coder<?>> elementCoders = getComponents(cloudObject);
+ return UnionCoder.of(elementCoders);
+ }
+
+ @Override
+ public Class<? extends UnionCoder> getSupportedClass() {
+ return UnionCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(UnionCoder.class).getClassName();
+ }
+ };
+ }
+
+ public static CloudObjectTranslator<CoGbkResultCoder> coGroupByKeyResult() {
+ return new CloudObjectTranslator<CoGbkResultCoder>() {
+ @Override
+ public CloudObject toCloudObject(CoGbkResultCoder target) {
+ CloudObject base = CloudObject.forClass(CoGbkResultCoder.class);
+ Structs.addObject(
+ base, PropertyNames.CO_GBK_RESULT_SCHEMA, toCloudObject(target.getSchema()));
+ return addComponents(base, Collections.singletonList(target.getUnionCoder()));
+ }
+
+ private CloudObject toCloudObject(CoGbkResultSchema schema) {
+ CloudObject result = CloudObject.forClass(CoGbkResultSchema.class);
+ List<CloudObject> tags = new ArrayList<>(schema.getTupleTagList().size());
+ for (TupleTag<?> tag : schema.getTupleTagList().getAll()) {
+ CloudObject tagCloudObject = CloudObject.forClass(TupleTag.class);
+ Structs.addString(tagCloudObject, PropertyNames.VALUE, tag.getId());
+ tags.add(tagCloudObject);
+ }
+ Structs.addList(result, PropertyNames.TUPLE_TAGS, tags);
+ return result;
+ }
+
+ @Override
+ public CoGbkResultCoder fromCloudObject(CloudObject cloudObject) {
+ List<Coder<?>> components = getComponents(cloudObject);
+ checkArgument(
+ components.size() == 1,
+ "Expected 1 component for %s, got %s",
+ CoGbkResultCoder.class.getSimpleName(),
+ components.size());
+ checkArgument(
+ components.get(0) instanceof UnionCoder,
+ "Expected only component to be a %s, got %s",
+ UnionCoder.class.getSimpleName(),
+ components.get(0).getClass().getName());
+ return CoGbkResultCoder.of(
+ schemaFromCloudObject(
+ CloudObject.fromSpec(
+ Structs.getObject(cloudObject, PropertyNames.CO_GBK_RESULT_SCHEMA))),
+ (UnionCoder) components.get(0));
+ }
+
+ @Override
+ public Class<? extends CoGbkResultCoder> getSupportedClass() {
+ return CoGbkResultCoder.class;
+ }
+
+ private CoGbkResultSchema schemaFromCloudObject(CloudObject cloudObject) {
+ List<TupleTag<?>> tags = new ArrayList<>();
+ List<Map<String, Object>> serializedTags =
+ Structs.getListOfMaps(
+ cloudObject,
+ PropertyNames.TUPLE_TAGS,
+ Collections.<Map<String, Object>>emptyList());
+ for (Map<String, Object> serializedTag : serializedTags) {
+ TupleTag<?> tag = new TupleTag<>(Structs.getString(serializedTag, PropertyNames.VALUE));
+ tags.add(tag);
+ }
+ return CoGbkResultSchema.of(tags);
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(CoGbkResultCoder.class).getClassName();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 5cae13f..4567098 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -37,9 +37,12 @@ import org.apache.beam.sdk.coders.BigIntegerCoder;
import org.apache.beam.sdk.coders.BitSetCoder;
import org.apache.beam.sdk.coders.ByteCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.DurationCoder;
import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -56,7 +59,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
public class DefaultCoderCloudObjectTranslatorRegistrar
implements CoderCloudObjectTranslatorRegistrar {
private static final List<CloudObjectTranslator<? extends Coder>> DEFAULT_TRANSLATORS =
- ImmutableList.<CloudObjectTranslator<? extends Coder>>of(
+ ImmutableList.of(
CloudObjectTranslators.globalWindow(),
CloudObjectTranslators.intervalWindow(),
CloudObjectTranslators.bytes(),
@@ -67,7 +70,16 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
CloudObjectTranslators.windowedValue(),
new AvroCoderCloudObjectTranslator(),
new SerializableCoderCloudObjectTranslator(),
+ CloudObjectTranslators.iterableLike(CollectionCoder.class),
+ CloudObjectTranslators.iterableLike(ListCoder.class),
+ CloudObjectTranslators.iterableLike(SetCoder.class),
+ CloudObjectTranslators.map(),
+ CloudObjectTranslators.nullable(),
+ CloudObjectTranslators.union(),
+ CloudObjectTranslators.coGroupByKeyResult(),
CloudObjectTranslators.javaSerialized());
+ // TODO: ElementAndRestrictionCoder. This is in runners-core, but probably needs to be
+ // in core-construction
@VisibleForTesting
static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS =
ImmutableSet.<Class<? extends Coder>>of(
@@ -91,6 +103,11 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
TextualIntegerCoder.class,
VarIntCoder.class,
VoidCoder.class);
+ // TODO: WriteBundlesToFiles.ResultCoder.class);
+ // TODO: Atomic, GCPIO Coders:
+ // TableRowInfoCoder.class
+ // PubsubUnboundedSink.OutgoingMessageCoder.class,
+ // PubsubUnboundedSource.PubsubCheckpointCoder.class,
@Override
public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() {
@@ -106,7 +123,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
public Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
classesToTranslators() {
Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
- ImmutableMap.<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>builder();
+ ImmutableMap.builder();
for (CloudObjectTranslator<? extends Coder> defaultTranslator : DEFAULT_TRANSLATORS) {
builder.put(defaultTranslator.getSupportedClass(), defaultTranslator);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index b670268..2e66d43 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -37,18 +37,27 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
+import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
@@ -84,7 +93,7 @@ public class CloudObjectsTest {
Set<Class<? extends Coder>> missing = new HashSet<>();
missing.addAll(defaultCoderTranslators);
missing.removeAll(testedClasses);
- assertThat(missing, emptyIterable());
+ assertThat("Coders with custom serializers should all be tested", missing, emptyIterable());
}
@Test
@@ -117,10 +126,28 @@ public class CloudObjectsTest {
WindowedValue.getFullCoder(
KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()),
IntervalWindow.getCoder()))
- .add(VarLongCoder.of())
.add(ByteArrayCoder.of())
+ .add(VarLongCoder.of())
.add(SerializableCoder.of(Record.class))
- .add(AvroCoder.of(Record.class));
+ .add(AvroCoder.of(Record.class))
+ .add(CollectionCoder.of(VarLongCoder.of()))
+ .add(ListCoder.of(VarLongCoder.of()))
+ .add(SetCoder.of(VarLongCoder.of()))
+ .add(MapCoder.of(VarLongCoder.of(), ByteArrayCoder.of()))
+ .add(NullableCoder.of(IntervalWindow.getCoder()))
+ .add(
+ UnionCoder.of(
+ ImmutableList.<Coder<?>>of(
+ VarLongCoder.of(),
+ ByteArrayCoder.of(),
+ KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()))))
+ .add(
+ CoGbkResultCoder.of(
+ CoGbkResultSchema.of(
+ ImmutableList.<TupleTag<?>>of(
+ new TupleTag<Long>(), new TupleTag<byte[]>())),
+ UnionCoder.of(
+ ImmutableList.<Coder<?>>of(VarLongCoder.of(), ByteArrayCoder.of()))));
for (Class<? extends Coder> atomicCoder :
DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) {
dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());
http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 6c62cbe..02e1185 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -230,6 +230,14 @@ public class CoGbkResult {
return ImmutableList.of(unionCoder);
}
+ public CoGbkResultSchema getSchema() {
+ return schema;
+ }
+
+ public UnionCoder getUnionCoder() {
+ return unionCoder;
+ }
+
@Override
@SuppressWarnings("unchecked")
public void encode(