You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/27 02:08:29 UTC

[2/2] beam git commit: Add Registrars for Coder Cloud Object Translators

Add Registrars for Coder Cloud Object Translators

This permits registration of a Translator for an arbitrary coder.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d835a6ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d835a6ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d835a6ea

Branch: refs/heads/master
Commit: d835a6ea73afb8d7d48655a7cf2bb3fbbc5ded10
Parents: a0b077f
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 15:31:23 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 26 19:08:24 2017 -0700

----------------------------------------------------------------------
 .../dataflow/util/CloudObjectTranslator.java    |  8 +++
 .../runners/dataflow/util/CloudObjects.java     | 53 +++++++++++++-------
 .../CoderCloudObjectTranslatorRegistrar.java    | 47 +++++++++++++++++
 ...aultCoderCloudObjectTranslatorRegistrar.java | 45 +++++++++++++++++
 .../runners/dataflow/util/CloudObjectsTest.java |  3 +-
 5 files changed, 137 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index 1672a26..d0b111f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -25,5 +25,13 @@ import org.apache.beam.sdk.util.CloudObject;
  * to the original object.
  */
 public interface CloudObjectTranslator<T> {
+  /**
+   * Converts the provided object into an equivalent {@link CloudObject}.
+   */
   CloudObject toCloudObject(T target);
+
+  /**
+   * Converts back into the original object from a provided {@link CloudObject}.
+   */
+  T fromCloudObject(CloudObject cloudObject);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 8978849..bc0cc75 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -18,8 +18,11 @@
 
 package org.apache.beam.runners.dataflow.util;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import java.util.ServiceLoader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.CloudObject;
@@ -32,19 +35,38 @@ public class CloudObjects {
   private CloudObjects() {}
 
   static final Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
-      CODER_TRANSLATORS = populateCoderInitializers();
+      CODER_TRANSLATORS = populateCoderTranslators();
+  static final Map<String, CloudObjectTranslator<? extends Coder>>
+      CLOUD_OBJECT_CLASS_NAME_TRANSLATORS = populateCloudObjectTranslators();
 
   private static Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
-      populateCoderInitializers() {
+      populateCoderTranslators() {
     ImmutableMap.Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
         ImmutableMap.builder();
-    // TODO: Implement
+    for (CoderCloudObjectTranslatorRegistrar coderRegistrar :
+        ServiceLoader.load(CoderCloudObjectTranslatorRegistrar.class)) {
+      builder.putAll(coderRegistrar.classesToTranslators());
+    }
     return builder.build();
   }
 
+  private static Map<String, CloudObjectTranslator<? extends Coder>>
+      populateCloudObjectTranslators() {
+    ImmutableMap.Builder<String, CloudObjectTranslator<? extends Coder>> builder =
+        ImmutableMap.builder();
+    for (CoderCloudObjectTranslatorRegistrar coderRegistrar :
+        ServiceLoader.load(CoderCloudObjectTranslatorRegistrar.class)) {
+      builder.putAll(coderRegistrar.classNamesToTranslators());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Convert the provided {@link Coder} into a {@link CloudObject}.
+   */
   public static CloudObject asCloudObject(Coder<?> coder) {
-    CloudObjectTranslator<Coder<?>> translator =
-        (CloudObjectTranslator<Coder<?>>) CODER_TRANSLATORS.get(coder.getClass());
+    CloudObjectTranslator<Coder> translator =
+        (CloudObjectTranslator<Coder>) CODER_TRANSLATORS.get(coder.getClass());
     if (translator != null) {
       return translator.toCloudObject(coder);
     } else if (coder instanceof CustomCoder) {
@@ -67,18 +89,13 @@ public class CloudObjects {
   }
 
   public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
-    if (cloudObject.getClassName().equals(CustomCoder.class.getName())) {
-      return customCoderFromCloudObject(cloudObject);
-    }
-    throw new IllegalArgumentException(
-        String.format("Unknown Cloud Object Class Name %s", cloudObject.getClassName()));
-  }
-
-  private static Coder<?> customCoderFromCloudObject(CloudObject cloudObject) {
-    String type = Structs.getString(cloudObject, "type");
-    String serializedCoder = Structs.getString(cloudObject, "serialized_coder");
-    return (CustomCoder<?>)
-        SerializableUtils.deserializeFromByteArray(
-            StringUtils.jsonStringToByteArray(serializedCoder), type);
+    CloudObjectTranslator<? extends Coder> translator =
+        CLOUD_OBJECT_CLASS_NAME_TRANSLATORS.get(cloudObject.getClassName());
+    checkArgument(
+        translator != null,
+        "Unknown %s class %s",
+        Coder.class.getSimpleName(),
+        cloudObject.getClassName());
+    return translator.fromCloudObject(cloudObject);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
new file mode 100644
index 0000000..446eb3b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CloudObject;
+
+/**
+ * {@link Coder} authors have the ability to automatically have their {@link Coder} registered with
+ * the Dataflow Runner by creating a {@link ServiceLoader} entry and a concrete implementation of
+ * this interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+public interface CoderCloudObjectTranslatorRegistrar {
+  /**
+   * Gets a map from {@link Coder} to a {@link CloudObjectTranslator} that can translate that {@link
+   * Coder}.
+   */
+  Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> classesToTranslators();
+
+  /**
+   * Gets a map from the name returned by {@link CloudObject#getClassName()} to a translator that
+   * can convert into the equivalent {@link Coder}.
+   */
+  Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
new file mode 100644
index 0000000..72fd9ce
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * The {@link CoderCloudObjectTranslatorRegistrar} containing the default collection of
+ * {@link Coder} {@link CloudObjectTranslator Cloud Object Translators}.
+ */
+@AutoService(CoderCloudObjectTranslatorRegistrar.class)
+public class DefaultCoderCloudObjectTranslatorRegistrar
+    implements CoderCloudObjectTranslatorRegistrar {
+  @Override
+  public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() {
+    // TODO: Add translators
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
+      classesToTranslators() {
+    // TODO: Add translato
+    return Collections.emptyMap();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index c274bc3..7562322 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -42,7 +42,8 @@ import org.junit.runners.Parameterized.Parameters;
 public class CloudObjectsTest {
   @Parameters(name = "{index}: {0}")
   public static Iterable<Coder<?>> data() {
-    return ImmutableList.<Coder<?>>builder().add(new RecordCoder()).build();
+    // TODO: Implement when translators are registered with the CoderCloudObjectTranslatorRegsitrar
+    return ImmutableList.<Coder<?>>builder().build();
   }
 
   @Parameter(0)