You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/03 02:48:00 UTC

[1/2] beam git commit: Add Additional CloudObjectTranslators

Repository: beam
Updated Branches:
  refs/heads/master 2d22485c1 -> dc0fdcb7e


Add Additional CloudObjectTranslators

Add IterableLikeCoders, MapCoder

Add UnionCoder, CoGbkResultCoder, and NullableCoder translators.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/73cdd994
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/73cdd994
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/73cdd994

Branch: refs/heads/master
Commit: 73cdd99466bef0c35158d4dd89ac10e9cb056782
Parents: 2d22485
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 1 22:29:34 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 2 18:07:00 2017 -0700

----------------------------------------------------------------------
 .../dataflow/util/CloudObjectTranslators.java   | 199 +++++++++++++++++++
 ...aultCoderCloudObjectTranslatorRegistrar.java |  21 +-
 .../runners/dataflow/util/CloudObjectsTest.java |  33 ++-
 .../beam/sdk/transforms/join/CoGbkResult.java   |   8 +
 4 files changed, 256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index c27bee7..f3e3312 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.dataflow.util;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -27,9 +29,15 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.IterableLikeCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
+import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
 import org.apache.beam.sdk.util.CloudObject;
@@ -39,6 +47,7 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.StringUtils;
 import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.TupleTag;
 
 /** Utilities for creating {@link CloudObjectTranslator} instances for {@link Coder Coders}. */
 class CloudObjectTranslators {
@@ -373,4 +382,194 @@ class CloudObjectTranslators {
       }
     };
   }
+  public static CloudObjectTranslator<IterableLikeCoder> iterableLike(
+      final Class<? extends IterableLikeCoder> clazz) {
+    return new CloudObjectTranslator<IterableLikeCoder>() {
+      @Override
+      public CloudObject toCloudObject(IterableLikeCoder target) {
+        CloudObject base = CloudObject.forClass(clazz);
+        return addComponents(base, Collections.<Coder<?>>singletonList(target.getElemCoder()));
+      }
+
+      @Override
+      public IterableLikeCoder<?, ?> fromCloudObject(CloudObject cloudObject) {
+        List<Coder<?>> elemCoderList = getComponents(cloudObject);
+        checkArgument(
+            elemCoderList.size() == 1,
+            "Expected 1 component for %s, got %s",
+            cloudObject.getClassName(),
+            elemCoderList.size());
+        return InstanceBuilder.ofType(clazz)
+            .fromFactoryMethod("of")
+            .withArg(Coder.class, elemCoderList.get(0))
+            .build();
+      }
+
+      @Override
+      public Class<? extends IterableLikeCoder> getSupportedClass() {
+        return clazz;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(clazz).getClassName();
+      }
+    };
+  }
+
+  public static CloudObjectTranslator<MapCoder> map() {
+    return new CloudObjectTranslator<MapCoder>() {
+      @Override
+      public CloudObject toCloudObject(MapCoder target) {
+        CloudObject base = CloudObject.forClass(MapCoder.class);
+        return addComponents(
+            base, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()));
+      }
+
+      @Override
+      public MapCoder<?, ?> fromCloudObject(CloudObject cloudObject) {
+        List<Coder<?>> components = getComponents(cloudObject);
+        checkArgument(
+            components.size() == 2,
+            "Expected 2 components for %s, got %s",
+            MapCoder.class.getSimpleName(),
+            components.size());
+        return MapCoder.of(components.get(0), components.get(1));
+      }
+
+      @Override
+      public Class<? extends MapCoder> getSupportedClass() {
+        return MapCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(MapCoder.class).getClassName();
+      }
+    };
+  }
+
+  public static CloudObjectTranslator<NullableCoder> nullable() {
+    return new CloudObjectTranslator<NullableCoder>() {
+      @Override
+      public CloudObject toCloudObject(NullableCoder target) {
+        CloudObject base = CloudObject.forClass(NullableCoder.class);
+        return addComponents(base, Collections.<Coder<?>>singletonList(target.getValueCoder()));
+      }
+
+      @Override
+      public NullableCoder<?> fromCloudObject(CloudObject cloudObject) {
+        List<Coder<?>> componentList = getComponents(cloudObject);
+        checkArgument(
+            componentList.size() == 1,
+            "Expected 1 component for %s, got %s",
+            NullableCoder.class.getSimpleName(),
+            componentList.size());
+        return NullableCoder.of(componentList.get(0));
+      }
+
+      @Override
+      public Class<? extends NullableCoder> getSupportedClass() {
+        return NullableCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(NullableCoder.class).getClassName();
+      }
+    };
+  }
+
+  public static CloudObjectTranslator<UnionCoder> union() {
+    return new CloudObjectTranslator<UnionCoder>() {
+      @Override
+      public CloudObject toCloudObject(UnionCoder target) {
+        return addComponents(CloudObject.forClass(UnionCoder.class), target.getElementCoders());
+      }
+
+      @Override
+      public UnionCoder fromCloudObject(CloudObject cloudObject) {
+        List<Coder<?>> elementCoders = getComponents(cloudObject);
+        return UnionCoder.of(elementCoders);
+      }
+
+      @Override
+      public Class<? extends UnionCoder> getSupportedClass() {
+        return UnionCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(UnionCoder.class).getClassName();
+      }
+    };
+  }
+
+  public static CloudObjectTranslator<CoGbkResultCoder> coGroupByKeyResult() {
+    return new CloudObjectTranslator<CoGbkResultCoder>() {
+      @Override
+      public CloudObject toCloudObject(CoGbkResultCoder target) {
+        CloudObject base = CloudObject.forClass(CoGbkResultCoder.class);
+        Structs.addObject(
+            base, PropertyNames.CO_GBK_RESULT_SCHEMA, toCloudObject(target.getSchema()));
+        return addComponents(base, Collections.singletonList(target.getUnionCoder()));
+      }
+
+      private CloudObject toCloudObject(CoGbkResultSchema schema) {
+        CloudObject result = CloudObject.forClass(CoGbkResultSchema.class);
+        List<CloudObject> tags = new ArrayList<>(schema.getTupleTagList().size());
+        for (TupleTag<?> tag : schema.getTupleTagList().getAll()) {
+          CloudObject tagCloudObject = CloudObject.forClass(TupleTag.class);
+          Structs.addString(tagCloudObject, PropertyNames.VALUE, tag.getId());
+          tags.add(tagCloudObject);
+        }
+        Structs.addList(result, PropertyNames.TUPLE_TAGS, tags);
+        return result;
+      }
+
+      @Override
+      public CoGbkResultCoder fromCloudObject(CloudObject cloudObject) {
+        List<Coder<?>> components = getComponents(cloudObject);
+        checkArgument(
+            components.size() == 1,
+            "Expected 1 component for %s, got %s",
+            CoGbkResultCoder.class.getSimpleName(),
+            components.size());
+        checkArgument(
+            components.get(0) instanceof UnionCoder,
+            "Expected only component to be a %s, got %s",
+            UnionCoder.class.getSimpleName(),
+            components.get(0).getClass().getName());
+        return CoGbkResultCoder.of(
+            schemaFromCloudObject(
+                CloudObject.fromSpec(
+                    Structs.getObject(cloudObject, PropertyNames.CO_GBK_RESULT_SCHEMA))),
+            (UnionCoder) components.get(0));
+      }
+
+      @Override
+      public Class<? extends CoGbkResultCoder> getSupportedClass() {
+        return CoGbkResultCoder.class;
+      }
+
+      private CoGbkResultSchema schemaFromCloudObject(CloudObject cloudObject) {
+        List<TupleTag<?>> tags = new ArrayList<>();
+        List<Map<String, Object>> serializedTags =
+            Structs.getListOfMaps(
+                cloudObject,
+                PropertyNames.TUPLE_TAGS,
+                Collections.<Map<String, Object>>emptyList());
+        for (Map<String, Object> serializedTag : serializedTags) {
+          TupleTag<?> tag = new TupleTag<>(Structs.getString(serializedTag, PropertyNames.VALUE));
+          tags.add(tag);
+        }
+        return CoGbkResultSchema.of(tags);
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(CoGbkResultCoder.class).getClassName();
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 5cae13f..4567098 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -37,9 +37,12 @@ import org.apache.beam.sdk.coders.BigIntegerCoder;
 import org.apache.beam.sdk.coders.BitSetCoder;
 import org.apache.beam.sdk.coders.ByteCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CollectionCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.DurationCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TextualIntegerCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -56,7 +59,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
 public class DefaultCoderCloudObjectTranslatorRegistrar
     implements CoderCloudObjectTranslatorRegistrar {
   private static final List<CloudObjectTranslator<? extends Coder>> DEFAULT_TRANSLATORS =
-      ImmutableList.<CloudObjectTranslator<? extends Coder>>of(
+      ImmutableList.of(
           CloudObjectTranslators.globalWindow(),
           CloudObjectTranslators.intervalWindow(),
           CloudObjectTranslators.bytes(),
@@ -67,7 +70,16 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
           CloudObjectTranslators.windowedValue(),
           new AvroCoderCloudObjectTranslator(),
           new SerializableCoderCloudObjectTranslator(),
+          CloudObjectTranslators.iterableLike(CollectionCoder.class),
+          CloudObjectTranslators.iterableLike(ListCoder.class),
+          CloudObjectTranslators.iterableLike(SetCoder.class),
+          CloudObjectTranslators.map(),
+          CloudObjectTranslators.nullable(),
+          CloudObjectTranslators.union(),
+          CloudObjectTranslators.coGroupByKeyResult(),
           CloudObjectTranslators.javaSerialized());
+  // TODO: ElementAndRestrictionCoder. This is in runners-core, but probably needs to be
+  // in core-construction
   @VisibleForTesting
   static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS =
       ImmutableSet.<Class<? extends Coder>>of(
@@ -91,6 +103,11 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
           TextualIntegerCoder.class,
           VarIntCoder.class,
           VoidCoder.class);
+  // TODO: WriteBundlesToFiles.ResultCoder.class);
+  // TODO: Atomic, GCPIO Coders:
+  //   TableRowInfoCoder.class
+  //   PubsubUnboundedSink.OutgoingMessageCoder.class,
+  //   PubsubUnboundedSource.PubsubCheckpointCoder.class,
 
   @Override
   public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() {
@@ -106,7 +123,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
   public Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
       classesToTranslators() {
     Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
-        ImmutableMap.<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>builder();
+        ImmutableMap.builder();
     for (CloudObjectTranslator<? extends Coder> defaultTranslator : DEFAULT_TRANSLATORS) {
       builder.put(defaultTranslator.getSupportedClass(), defaultTranslator);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index b670268..2e66d43 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -37,18 +37,27 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CollectionCoder;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
+import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
@@ -84,7 +93,7 @@ public class CloudObjectsTest {
       Set<Class<? extends Coder>> missing = new HashSet<>();
       missing.addAll(defaultCoderTranslators);
       missing.removeAll(testedClasses);
-      assertThat(missing, emptyIterable());
+      assertThat("Coders with custom serializers should all be tested", missing, emptyIterable());
     }
 
     @Test
@@ -117,10 +126,28 @@ public class CloudObjectsTest {
                   WindowedValue.getFullCoder(
                       KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()),
                       IntervalWindow.getCoder()))
-              .add(VarLongCoder.of())
               .add(ByteArrayCoder.of())
+              .add(VarLongCoder.of())
               .add(SerializableCoder.of(Record.class))
-              .add(AvroCoder.of(Record.class));
+              .add(AvroCoder.of(Record.class))
+              .add(CollectionCoder.of(VarLongCoder.of()))
+              .add(ListCoder.of(VarLongCoder.of()))
+              .add(SetCoder.of(VarLongCoder.of()))
+              .add(MapCoder.of(VarLongCoder.of(), ByteArrayCoder.of()))
+              .add(NullableCoder.of(IntervalWindow.getCoder()))
+              .add(
+                  UnionCoder.of(
+                      ImmutableList.<Coder<?>>of(
+                          VarLongCoder.of(),
+                          ByteArrayCoder.of(),
+                          KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()))))
+              .add(
+                  CoGbkResultCoder.of(
+                      CoGbkResultSchema.of(
+                          ImmutableList.<TupleTag<?>>of(
+                              new TupleTag<Long>(), new TupleTag<byte[]>())),
+                      UnionCoder.of(
+                          ImmutableList.<Coder<?>>of(VarLongCoder.of(), ByteArrayCoder.of()))));
       for (Class<? extends Coder> atomicCoder :
           DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) {
         dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());

http://git-wip-us.apache.org/repos/asf/beam/blob/73cdd994/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 6c62cbe..02e1185 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -230,6 +230,14 @@ public class CoGbkResult {
       return ImmutableList.of(unionCoder);
     }
 
+    public CoGbkResultSchema getSchema() {
+      return schema;
+    }
+
+    public UnionCoder getUnionCoder() {
+      return unionCoder;
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     public void encode(


[2/2] beam git commit: [BEAM-2020] Add additional CloudObject Translators

Posted by lc...@apache.org.
[BEAM-2020] Add additional CloudObject Translators

This closes #2805


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc0fdcb7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc0fdcb7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc0fdcb7

Branch: refs/heads/master
Commit: dc0fdcb7e62067fdc629b8558659fce7e37e4856
Parents: 2d22485 73cdd99
Author: Luke Cwik <lc...@google.com>
Authored: Tue May 2 19:47:48 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue May 2 19:47:48 2017 -0700

----------------------------------------------------------------------
 .../dataflow/util/CloudObjectTranslators.java   | 199 +++++++++++++++++++
 ...aultCoderCloudObjectTranslatorRegistrar.java |  21 +-
 .../runners/dataflow/util/CloudObjectsTest.java |  33 ++-
 .../beam/sdk/transforms/join/CoGbkResult.java   |   8 +
 4 files changed, 256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------