You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/29 03:43:49 UTC

[3/3] beam git commit: Java-Serialize Coders Directly

Java-Serialize Coders Directly

Known Coders will be serialized via a known path. Unknown
Coders will be serialized as a java object.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed0b63a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed0b63a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed0b63a4

Branch: refs/heads/master
Commit: ed0b63a4c1c55d58b94ab7574c14d221486f434a
Parents: 8cc4d59
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 18:06:13 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |  5 ----
 .../beam/runners/core/construction/Coders.java  | 30 +++++++++++---------
 .../runners/core/construction/CodersTest.java   |  3 +-
 3 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 854fdc1..b38e602 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -70,11 +70,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 8793df4..094f21f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
@@ -44,18 +43,15 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 
 /** Converts to and from Beam Runner API representations of {@link Coder Coders}. */
 public class Coders {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
   // This URN says that the coder is just a UDF blob this SDK understands
   // TODO: standardize such things
-  public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+  public static final String JAVA_SERIALIZED_CODER_URN = "urn:beam:coders:javasdk:0.1";
 
   // The URNs for coders which are shared across languages
   @VisibleForTesting
@@ -71,6 +67,15 @@ public class Coders {
           .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
           .build();
 
+  public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException {
+    SdkComponents components = SdkComponents.create();
+    RunnerApi.Coder coderProto = toProto(coder, components);
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setCoder(coderProto)
+        .setComponents(components.toComponents())
+        .build();
+  }
+
   public static RunnerApi.Coder toProto(
       Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) throws IOException {
     if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
@@ -108,13 +113,13 @@ public class Coders {
             SdkFunctionSpec.newBuilder()
                 .setSpec(
                     FunctionSpec.newBuilder()
-                        .setUrn(CUSTOM_CODER_URN)
+                        .setUrn(JAVA_SERIALIZED_CODER_URN)
                         .setParameter(
                             Any.pack(
                                 BytesValue.newBuilder()
                                     .setValue(
                                         ByteString.copyFrom(
-                                            OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject())))
+                                            SerializableUtils.serializeToByteArray(coder)))
                                     .build()))))
         .build();
   }
@@ -122,7 +127,7 @@ public class Coders {
   public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components components)
       throws IOException {
     String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
-    if (coderSpecUrn.equals(CUSTOM_CODER_URN)) {
+    if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) {
       return fromCustomCoder(protoCoder, components);
     }
     return fromKnownCoder(protoCoder, components);
@@ -165,8 +170,8 @@ public class Coders {
   private static Coder<?> fromCustomCoder(
       RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components)
       throws IOException {
-    CloudObject coderCloudObject =
-        OBJECT_MAPPER.readValue(
+    return (Coder<?>)
+        SerializableUtils.deserializeFromByteArray(
             protoCoder
                 .getSpec()
                 .getSpec()
@@ -174,7 +179,6 @@ public class Coders {
                 .unpack(BytesValue.class)
                 .getValue()
                 .toByteArray(),
-            CloudObject.class);
-    return Serializer.deserialize(coderCloudObject, Coder.class);
+            protoCoder.getSpec().getSpec().getUrn());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index ca0fdc9..ecd0fa5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -133,7 +133,8 @@ public class CodersTest {
       if (KNOWN_CODERS.contains(coder)) {
         for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) {
           assertThat(
-              encodedCoder.getSpec().getSpec().getUrn(), not(equalTo(Coders.CUSTOM_CODER_URN)));
+              encodedCoder.getSpec().getSpec().getUrn(),
+              not(equalTo(Coders.JAVA_SERIALIZED_CODER_URN)));
         }
       }
     }