You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/27 02:08:29 UTC
[2/2] beam git commit: Add Registrars for Coder Cloud Object
Translators
Add Registrars for Coder Cloud Object Translators
This permits registration of a Translator for an arbitrary coder.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d835a6ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d835a6ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d835a6ea
Branch: refs/heads/master
Commit: d835a6ea73afb8d7d48655a7cf2bb3fbbc5ded10
Parents: a0b077f
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 15:31:23 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 26 19:08:24 2017 -0700
----------------------------------------------------------------------
.../dataflow/util/CloudObjectTranslator.java | 8 +++
.../runners/dataflow/util/CloudObjects.java | 53 +++++++++++++-------
.../CoderCloudObjectTranslatorRegistrar.java | 47 +++++++++++++++++
...aultCoderCloudObjectTranslatorRegistrar.java | 45 +++++++++++++++++
.../runners/dataflow/util/CloudObjectsTest.java | 3 +-
5 files changed, 137 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index 1672a26..d0b111f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -25,5 +25,13 @@ import org.apache.beam.sdk.util.CloudObject;
* to the original object.
*/
public interface CloudObjectTranslator<T> {
+ /**
+ * Converts the provided object into an equivalent {@link CloudObject}.
+ */
CloudObject toCloudObject(T target);
+
+ /**
+ * Converts back into the original object from a provided {@link CloudObject}.
+ */
+ T fromCloudObject(CloudObject cloudObject);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 8978849..bc0cc75 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -18,8 +18,11 @@
package org.apache.beam.runners.dataflow.util;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.collect.ImmutableMap;
import java.util.Map;
+import java.util.ServiceLoader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.CloudObject;
@@ -32,19 +35,38 @@ public class CloudObjects {
private CloudObjects() {}
static final Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
- CODER_TRANSLATORS = populateCoderInitializers();
+ CODER_TRANSLATORS = populateCoderTranslators();
+ static final Map<String, CloudObjectTranslator<? extends Coder>>
+ CLOUD_OBJECT_CLASS_NAME_TRANSLATORS = populateCloudObjectTranslators();
private static Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
- populateCoderInitializers() {
+ populateCoderTranslators() {
ImmutableMap.Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
ImmutableMap.builder();
- // TODO: Implement
+ for (CoderCloudObjectTranslatorRegistrar coderRegistrar :
+ ServiceLoader.load(CoderCloudObjectTranslatorRegistrar.class)) {
+ builder.putAll(coderRegistrar.classesToTranslators());
+ }
return builder.build();
}
+ private static Map<String, CloudObjectTranslator<? extends Coder>>
+ populateCloudObjectTranslators() {
+ ImmutableMap.Builder<String, CloudObjectTranslator<? extends Coder>> builder =
+ ImmutableMap.builder();
+ for (CoderCloudObjectTranslatorRegistrar coderRegistrar :
+ ServiceLoader.load(CoderCloudObjectTranslatorRegistrar.class)) {
+ builder.putAll(coderRegistrar.classNamesToTranslators());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Convert the provided {@link Coder} into a {@link CloudObject}.
+ */
public static CloudObject asCloudObject(Coder<?> coder) {
- CloudObjectTranslator<Coder<?>> translator =
- (CloudObjectTranslator<Coder<?>>) CODER_TRANSLATORS.get(coder.getClass());
+ CloudObjectTranslator<Coder> translator =
+ (CloudObjectTranslator<Coder>) CODER_TRANSLATORS.get(coder.getClass());
if (translator != null) {
return translator.toCloudObject(coder);
} else if (coder instanceof CustomCoder) {
@@ -67,18 +89,13 @@ public class CloudObjects {
}
public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
- if (cloudObject.getClassName().equals(CustomCoder.class.getName())) {
- return customCoderFromCloudObject(cloudObject);
- }
- throw new IllegalArgumentException(
- String.format("Unknown Cloud Object Class Name %s", cloudObject.getClassName()));
- }
-
- private static Coder<?> customCoderFromCloudObject(CloudObject cloudObject) {
- String type = Structs.getString(cloudObject, "type");
- String serializedCoder = Structs.getString(cloudObject, "serialized_coder");
- return (CustomCoder<?>)
- SerializableUtils.deserializeFromByteArray(
- StringUtils.jsonStringToByteArray(serializedCoder), type);
+ CloudObjectTranslator<? extends Coder> translator =
+ CLOUD_OBJECT_CLASS_NAME_TRANSLATORS.get(cloudObject.getClassName());
+ checkArgument(
+ translator != null,
+ "Unknown %s class %s",
+ Coder.class.getSimpleName(),
+ cloudObject.getClassName());
+ return translator.fromCloudObject(cloudObject);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
new file mode 100644
index 0000000..446eb3b
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CoderCloudObjectTranslatorRegistrar.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CloudObject;
+
+/**
+ * {@link Coder} authors have the ability to automatically have their {@link Coder} registered with
+ * the Dataflow Runner by creating a {@link ServiceLoader} entry and a concrete implementation of
+ * this interface.
+ *
+ * <p>It is optional but recommended to use one of the many build time tools such as
+ * {@link AutoService} to generate the necessary META-INF files automatically.
+ */
+public interface CoderCloudObjectTranslatorRegistrar {
+ /**
+ * Gets a map from {@link Coder} to a {@link CloudObjectTranslator} that can translate that {@link
+ * Coder}.
+ */
+ Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> classesToTranslators();
+
+ /**
+ * Gets a map from the name returned by {@link CloudObject#getClassName()} to a translator that
+ * can convert into the equivalent {@link Coder}.
+ */
+ Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
new file mode 100644
index 0000000..72fd9ce
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * The {@link CoderCloudObjectTranslatorRegistrar} containing the default collection of
+ * {@link Coder} {@link CloudObjectTranslator Cloud Object Translators}.
+ */
+@AutoService(CoderCloudObjectTranslatorRegistrar.class)
+public class DefaultCoderCloudObjectTranslatorRegistrar
+ implements CoderCloudObjectTranslatorRegistrar {
+ @Override
+ public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() {
+ // TODO: Add translators
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
+ classesToTranslators() {
+ // TODO: Add translato
+ return Collections.emptyMap();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d835a6ea/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index c274bc3..7562322 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -42,7 +42,8 @@ import org.junit.runners.Parameterized.Parameters;
public class CloudObjectsTest {
@Parameters(name = "{index}: {0}")
public static Iterable<Coder<?>> data() {
- return ImmutableList.<Coder<?>>builder().add(new RecordCoder()).build();
+ // TODO: Implement when translators are registered with the CoderCloudObjectTranslatorRegsitrar
+ return ImmutableList.<Coder<?>>builder().build();
}
@Parameter(0)