You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/01 23:45:56 UTC

[1/2] beam git commit: Add CloudObjectTranslators for Avro, Serializable

Repository: beam
Updated Branches:
  refs/heads/master cd813fba0 -> 4867c995f


Add CloudObjectTranslators for Avro, Serializable

This means these coders can be sent to Dataflow with a stable
serialization.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ce97566
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ce97566
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ce97566

Branch: refs/heads/master
Commit: 2ce97566be76dacd9d529bde090e1f66f9ae819b
Parents: cd813fb
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 20:05:09 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 16:45:17 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  5 ++
 .../util/AvroCoderCloudObjectTranslator.java    | 62 +++++++++++++++++++
 ...aultCoderCloudObjectTranslatorRegistrar.java |  2 +
 .../SerializableCoderCloudObjectTranslator.java | 65 ++++++++++++++++++++
 .../runners/dataflow/util/CloudObjectsTest.java | 13 ++--
 .../sdk/extensions/protobuf/ProtoCoder.java     |  4 ++
 6 files changed, 146 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index cb0fa7f..e75abbd 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -289,6 +289,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
       <!-- Note: when relocating guava, ensure guava-testlib is not also relocated by
            excluding com.google.common.**.testing.* -->
       <groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
new file mode 100644
index 0000000..444849d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.avro.Schema;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Structs;
+
+/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
+class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> {
+  private static final String TYPE_FIELD = "type";
+  private static final String SCHEMA_FIELD = "schema";
+
+  @Override
+  public CloudObject toCloudObject(AvroCoder target) {
+    CloudObject base = CloudObject.forClass(AvroCoder.class);
+    Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString());
+    Structs.addString(base, TYPE_FIELD, target.getType().getName());
+    return base;
+  }
+
+  @Override
+  public AvroCoder<?> fromCloudObject(CloudObject cloudObject) {
+    Schema.Parser parser = new Schema.Parser();
+    String className = Structs.getString(cloudObject, TYPE_FIELD);
+    String schemaString = Structs.getString(cloudObject, SCHEMA_FIELD);
+    try {
+      Class<?> type = Class.forName(className);
+      Schema schema = parser.parse(schemaString);
+      return AvroCoder.of(type, schema);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @Override
+  public Class<? extends AvroCoder> getSupportedClass() {
+    return AvroCoder.class;
+  }
+
+  @Override
+  public String cloudObjectClassName() {
+    return CloudObject.forClass(AvroCoder.class).getClassName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 3b9fa95..29f047f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -65,6 +65,8 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
           CloudObjectTranslators.stream(),
           CloudObjectTranslators.pair(),
           CloudObjectTranslators.windowedValue(),
+          new AvroCoderCloudObjectTranslator(),
+          new SerializableCoderCloudObjectTranslator(),
           CloudObjectTranslators.custom());
   @VisibleForTesting
   static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS =

http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
new file mode 100644
index 0000000..67c021c
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Structs;
+
+/** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */
+class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<SerializableCoder> {
+  private static final String TYPE_FIELD = "type";
+
+  @Override
+  public CloudObject toCloudObject(SerializableCoder target) {
+    CloudObject base = CloudObject.forClass(SerializableCoder.class);
+    Structs.addString(base, TYPE_FIELD, target.getRecordType().getName());
+    return base;
+  }
+
+  @Override
+  public SerializableCoder<?> fromCloudObject(CloudObject cloudObject) {
+    String className = Structs.getString(cloudObject, TYPE_FIELD);
+    try {
+      Class<? extends Serializable> targetClass =
+          (Class<? extends Serializable>) Class.forName(className);
+      checkArgument(
+          Serializable.class.isAssignableFrom(targetClass),
+          "Target class %s does not extend %s",
+          targetClass.getName(),
+          Serializable.class.getSimpleName());
+      return SerializableCoder.of(targetClass);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @Override
+  public Class<SerializableCoder> getSupportedClass() {
+    return SerializableCoder.class;
+  }
+
+  @Override
+  public String cloudObjectClassName() {
+    return CloudObject.forClass(SerializableCoder.class).getClassName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index fdea285..a6a3f25 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -28,8 +28,10 @@ import com.google.common.collect.ImmutableList.Builder;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -37,12 +39,12 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.Serializer;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
@@ -112,7 +114,9 @@ public class CloudObjectsTest {
                       KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()),
                       IntervalWindow.getCoder()))
               .add(VarLongCoder.of())
-              .add(ByteArrayCoder.of());
+              .add(ByteArrayCoder.of())
+              .add(SerializableCoder.of(Record.class))
+              .add(AvroCoder.of(Record.class));
       for (Class<? extends Coder> atomicCoder :
           DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) {
         dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());
@@ -126,16 +130,15 @@ public class CloudObjectsTest {
     @Test
     public void toAndFromCloudObject() throws Exception {
       CloudObject cloudObject = CloudObjects.asCloudObject(coder);
-      Coder<?> reconstructed = Serializer.deserialize(cloudObject, Coder.class);
       Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject);
 
-      assertEquals(coder.getClass(), reconstructed.getClass());
       assertEquals(coder.getClass(), fromCloudObject.getClass());
-      assertEquals(coder, reconstructed);
       assertEquals(coder, fromCloudObject);
     }
   }
 
+  private static class Record implements Serializable {}
+
   private static class ObjectCoder extends CustomCoder<Object> {
     @Override
     public void encode(Object value, OutputStream outStream, Context context)

http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 9ec7aec..8e90a5f 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -229,6 +229,10 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
     return protoMessageClass;
   }
 
+  public Set<Class<?>> getExtensionHosts() {
+    return extensionHostClasses;
+  }
+
   /**
    * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages
    * to {@code T} registered with this {@link ProtoCoder}.


[2/2] beam git commit: [BEAM-2020] Add CloudObjectTranslators for Avro, Serializable

Posted by lc...@apache.org.
[BEAM-2020] Add CloudObjectTranslators for Avro, Serializable

This closes #2727


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4867c995
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4867c995
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4867c995

Branch: refs/heads/master
Commit: 4867c995fac2918fed241e2335cf7ea47585ffe4
Parents: cd813fb 2ce9756
Author: Luke Cwik <lc...@google.com>
Authored: Mon May 1 16:45:42 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 16:45:42 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  5 ++
 .../util/AvroCoderCloudObjectTranslator.java    | 62 +++++++++++++++++++
 ...aultCoderCloudObjectTranslatorRegistrar.java |  2 +
 .../SerializableCoderCloudObjectTranslator.java | 65 ++++++++++++++++++++
 .../runners/dataflow/util/CloudObjectsTest.java | 13 ++--
 .../sdk/extensions/protobuf/ProtoCoder.java     |  4 ++
 6 files changed, 146 insertions(+), 5 deletions(-)
----------------------------------------------------------------------