You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/25 18:15:18 UTC

[1/2] beam git commit: This closes #2642

Repository: beam
Updated Branches:
  refs/heads/master 9fdefe1ad -> 1a79635fe


This closes #2642


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a79635f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a79635f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a79635f

Branch: refs/heads/master
Commit: 1a79635fe1a7ad2733ad12b3837282ad57152feb
Parents: 9fdefe1 e9cfb48
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 25 11:15:12 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 11:15:12 2017 -0700

----------------------------------------------------------------------
 .../dataflow/util/CloudObjectTranslator.java    | 29 +++++++
 .../runners/dataflow/util/CloudObjects.java     | 84 ++++++++++++++++++++
 .../runners/dataflow/util/CloudObjectsTest.java | 83 +++++++++++++++++++
 .../java/org/apache/beam/sdk/coders/Coder.java  |  1 +
 4 files changed, 197 insertions(+)
----------------------------------------------------------------------



[2/2] beam git commit: Add Cloud Object Translators for Coders

Posted by tg...@apache.org.
Add Cloud Object Translators for Coders

Currently unused. These will replace the coder.asCloudObject within
Dataflow.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9cfb487
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9cfb487
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9cfb487

Branch: refs/heads/master
Commit: e9cfb487f45c3b7df2d6fcb8be7e0f3367f7b0a4
Parents: 9fdefe1
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 21 10:56:15 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 25 11:15:12 2017 -0700

----------------------------------------------------------------------
 .../dataflow/util/CloudObjectTranslator.java    | 29 +++++++
 .../runners/dataflow/util/CloudObjects.java     | 84 ++++++++++++++++++++
 .../runners/dataflow/util/CloudObjectsTest.java | 83 +++++++++++++++++++
 .../java/org/apache/beam/sdk/coders/Coder.java  |  1 +
 4 files changed, 197 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e9cfb487/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
new file mode 100644
index 0000000..1672a26
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.beam.sdk.util.CloudObject;
+
+/**
+ * An translator that takes an object and creates a {@link CloudObject} which can be converted back
+ * to the original object.
+ */
+public interface CloudObjectTranslator<T> {
+  CloudObject toCloudObject(T target);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e9cfb487/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
new file mode 100644
index 0000000..8978849
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.StringUtils;
+import org.apache.beam.sdk.util.Structs;
+
+/** Utilities for converting an object to a {@link CloudObject}. */
+public class CloudObjects {
+  private CloudObjects() {}
+
+  static final Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
+      CODER_TRANSLATORS = populateCoderInitializers();
+
+  private static Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
+      populateCoderInitializers() {
+    ImmutableMap.Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
+        ImmutableMap.builder();
+    // TODO: Implement
+    return builder.build();
+  }
+
+  public static CloudObject asCloudObject(Coder<?> coder) {
+    CloudObjectTranslator<Coder<?>> translator =
+        (CloudObjectTranslator<Coder<?>>) CODER_TRANSLATORS.get(coder.getClass());
+    if (translator != null) {
+      return translator.toCloudObject(coder);
+    } else if (coder instanceof CustomCoder) {
+      return customCoderAsCloudObject((CustomCoder<?>) coder);
+    }
+    throw new IllegalArgumentException(
+        String.format(
+            "Non-Custom %s with no registered %s", Coder.class, CloudObjectTranslator.class));
+  }
+
+  private static CloudObject customCoderAsCloudObject(CustomCoder<?> coder) {
+    CloudObject result = CloudObject.forClass(CustomCoder.class);
+    Structs.addString(result, "type", coder.getClass().getName());
+    Structs.addString(
+        result,
+        "serialized_coder",
+        StringUtils.byteArrayToJsonString(SerializableUtils.serializeToByteArray(coder)));
+
+    return result;
+  }
+
+  public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
+    if (cloudObject.getClassName().equals(CustomCoder.class.getName())) {
+      return customCoderFromCloudObject(cloudObject);
+    }
+    throw new IllegalArgumentException(
+        String.format("Unknown Cloud Object Class Name %s", cloudObject.getClassName()));
+  }
+
+  private static Coder<?> customCoderFromCloudObject(CloudObject cloudObject) {
+    String type = Structs.getString(cloudObject, "type");
+    String serializedCoder = Structs.getString(cloudObject, "serialized_coder");
+    return (CustomCoder<?>)
+        SerializableUtils.deserializeFromByteArray(
+            StringUtils.jsonStringToByteArray(serializedCoder), type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e9cfb487/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
new file mode 100644
index 0000000..c274bc3
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link CloudObjects}.
+ */
+@RunWith(Parameterized.class)
+public class CloudObjectsTest {
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<Coder<?>> data() {
+    return ImmutableList.<Coder<?>>builder().add(new RecordCoder()).build();
+  }
+
+  @Parameter(0)
+  public Coder<?> coder;
+
+  @Test
+  public void toAndFromCloudObject() throws Exception {
+    CloudObject cloudObject = CloudObjects.asCloudObject(coder);
+    Coder<?> reconstructed = CloudObjects.coderFromCloudObject(cloudObject);
+
+    assertEquals(coder.getClass(), reconstructed.getClass());
+    assertEquals(coder, reconstructed);
+  }
+
+  static class Record implements Serializable {}
+
+  private static class RecordCoder extends CustomCoder<Record> {
+    @Override
+    public void encode(Record value, OutputStream outStream, Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(InputStream inStream, Context context)
+        throws CoderException, IOException {
+      return new Record();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other != null && getClass().equals(other.getClass());
+    }
+
+    @Override
+    public int hashCode() {
+      return getClass().hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e9cfb487/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 779961e..28d87e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -148,6 +148,7 @@ public interface Coder<T> extends Serializable {
   /**
    * Returns the {@link CloudObject} that represents this {@code Coder}.
    */
+  @Deprecated
   CloudObject asCloudObject();
 
   /**