You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/25 18:15:18 UTC
[1/2] beam git commit: This closes #2642
Repository: beam
Updated Branches:
refs/heads/master 9fdefe1ad -> 1a79635fe
This closes #2642
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a79635f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a79635f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a79635f
Branch: refs/heads/master
Commit: 1a79635fe1a7ad2733ad12b3837282ad57152feb
Parents: 9fdefe1 e9cfb48
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 25 11:15:12 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 11:15:12 2017 -0700
----------------------------------------------------------------------
.../dataflow/util/CloudObjectTranslator.java | 29 +++++++
.../runners/dataflow/util/CloudObjects.java | 84 ++++++++++++++++++++
.../runners/dataflow/util/CloudObjectsTest.java | 83 +++++++++++++++++++
.../java/org/apache/beam/sdk/coders/Coder.java | 1 +
4 files changed, 197 insertions(+)
----------------------------------------------------------------------
[2/2] beam git commit: Add Cloud Object Translators for Coders
Posted by tg...@apache.org.
Add Cloud Object Translators for Coders
Currently unused. These will replace the coder.asCloudObject within
Dataflow.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9cfb487
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9cfb487
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9cfb487
Branch: refs/heads/master
Commit: e9cfb487f45c3b7df2d6fcb8be7e0f3367f7b0a4
Parents: 9fdefe1
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 21 10:56:15 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 11:15:12 2017 -0700
----------------------------------------------------------------------
.../dataflow/util/CloudObjectTranslator.java | 29 +++++++
.../runners/dataflow/util/CloudObjects.java | 84 ++++++++++++++++++++
.../runners/dataflow/util/CloudObjectsTest.java | 83 +++++++++++++++++++
.../java/org/apache/beam/sdk/coders/Coder.java | 1 +
4 files changed, 197 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e9cfb487/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
new file mode 100644
index 0000000..1672a26
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.beam.sdk.util.CloudObject;
+
+/**
+ * An translator that takes an object and creates a {@link CloudObject} which can be converted back
+ * to the original object.
+ */
+public interface CloudObjectTranslator<T> {
+ CloudObject toCloudObject(T target);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e9cfb487/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
new file mode 100644
index 0000000..8978849
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.StringUtils;
+import org.apache.beam.sdk.util.Structs;
+
+/** Utilities for converting an object to a {@link CloudObject}. */
+public class CloudObjects {
+ private CloudObjects() {}
+
+ static final Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
+ CODER_TRANSLATORS = populateCoderInitializers();
+
+ private static Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
+ populateCoderInitializers() {
+ ImmutableMap.Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
+ ImmutableMap.builder();
+ // TODO: Implement
+ return builder.build();
+ }
+
+ public static CloudObject asCloudObject(Coder<?> coder) {
+ CloudObjectTranslator<Coder<?>> translator =
+ (CloudObjectTranslator<Coder<?>>) CODER_TRANSLATORS.get(coder.getClass());
+ if (translator != null) {
+ return translator.toCloudObject(coder);
+ } else if (coder instanceof CustomCoder) {
+ return customCoderAsCloudObject((CustomCoder<?>) coder);
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Non-Custom %s with no registered %s", Coder.class, CloudObjectTranslator.class));
+ }
+
+ private static CloudObject customCoderAsCloudObject(CustomCoder<?> coder) {
+ CloudObject result = CloudObject.forClass(CustomCoder.class);
+ Structs.addString(result, "type", coder.getClass().getName());
+ Structs.addString(
+ result,
+ "serialized_coder",
+ StringUtils.byteArrayToJsonString(SerializableUtils.serializeToByteArray(coder)));
+
+ return result;
+ }
+
+ public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
+ if (cloudObject.getClassName().equals(CustomCoder.class.getName())) {
+ return customCoderFromCloudObject(cloudObject);
+ }
+ throw new IllegalArgumentException(
+ String.format("Unknown Cloud Object Class Name %s", cloudObject.getClassName()));
+ }
+
+ private static Coder<?> customCoderFromCloudObject(CloudObject cloudObject) {
+ String type = Structs.getString(cloudObject, "type");
+ String serializedCoder = Structs.getString(cloudObject, "serialized_coder");
+ return (CustomCoder<?>)
+ SerializableUtils.deserializeFromByteArray(
+ StringUtils.jsonStringToByteArray(serializedCoder), type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e9cfb487/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
new file mode 100644
index 0000000..c274bc3
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link CloudObjects}.
+ */
+@RunWith(Parameterized.class)
+public class CloudObjectsTest {
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<Coder<?>> data() {
+ return ImmutableList.<Coder<?>>builder().add(new RecordCoder()).build();
+ }
+
+ @Parameter(0)
+ public Coder<?> coder;
+
+ @Test
+ public void toAndFromCloudObject() throws Exception {
+ CloudObject cloudObject = CloudObjects.asCloudObject(coder);
+ Coder<?> reconstructed = CloudObjects.coderFromCloudObject(cloudObject);
+
+ assertEquals(coder.getClass(), reconstructed.getClass());
+ assertEquals(coder, reconstructed);
+ }
+
+ static class Record implements Serializable {}
+
+ private static class RecordCoder extends CustomCoder<Record> {
+ @Override
+ public void encode(Record value, OutputStream outStream, Context context)
+ throws CoderException, IOException {}
+
+ @Override
+ public Record decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ return new Record();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null && getClass().equals(other.getClass());
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e9cfb487/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 779961e..28d87e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -148,6 +148,7 @@ public interface Coder<T> extends Serializable {
/**
* Returns the {@link CloudObject} that represents this {@code Coder}.
*/
+ @Deprecated
CloudObject asCloudObject();
/**