You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/29 03:43:47 UTC

[1/3] beam git commit: This closes #2773

Repository: beam
Updated Branches:
  refs/heads/master 8cc4d59c2 -> 2b6cb8ca1


This closes #2773


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b6cb8ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b6cb8ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b6cb8ca

Branch: refs/heads/master
Commit: 2b6cb8ca17a8a87aa9db2e5725f018ef564f27d7
Parents: 8cc4d59 e2b5d6e
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 20:16:21 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |  5 ----
 .../beam/runners/core/construction/Coders.java  | 30 +++++++++++---------
 .../runners/core/construction/CodersTest.java   |  3 +-
 .../beam/runners/core/BeamFnDataReadRunner.java | 14 ++++-----
 .../runners/core/BeamFnDataWriteRunner.java     | 15 +++++-----
 .../control/ProcessBundleHandlerTest.java       | 28 +++++++++++-------
 .../runners/core/BeamFnDataReadRunnerTest.java  | 12 ++++----
 .../runners/core/BeamFnDataWriteRunnerTest.java | 12 ++++----
 8 files changed, 62 insertions(+), 57 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Use Runner API Encodings in the Fn API

Posted by tg...@apache.org.
Use Runner API Encodings in the Fn API

Removes uses of Coder.toCloudObject


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2b5d6ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2b5d6ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2b5d6ea

Branch: refs/heads/master
Commit: e2b5d6ea8a5d41ac27245a4999f59a73dfe24c43
Parents: ed0b63a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 18:06:38 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/BeamFnDataReadRunner.java | 14 +++++-----
 .../runners/core/BeamFnDataWriteRunner.java     | 15 +++++------
 .../control/ProcessBundleHandlerTest.java       | 28 ++++++++++++--------
 .../runners/core/BeamFnDataReadRunnerTest.java  | 12 ++++-----
 .../runners/core/BeamFnDataWriteRunnerTest.java | 12 ++++-----
 5 files changed, 43 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index 034ef84..805d480 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -30,8 +29,9 @@ import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
@@ -73,12 +73,12 @@ public class BeamFnDataReadRunner<OutputT> {
     this.beamFnDataClientFactory = beamFnDataClientFactory;
     this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values()));
 
+    MessageWithComponents runnerApiCoder =
+        coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
     @SuppressWarnings("unchecked")
-    Coder<WindowedValue<OutputT>> coder = Serializer.deserialize(
-        OBJECT_MAPPER.readValue(
-            coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
-            Map.class),
-        Coder.class);
+    Coder<WindowedValue<OutputT>> coder =
+        (Coder<WindowedValue<OutputT>>)
+            Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
     this.coder = coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index 54fd626..0ba09e3 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -19,15 +19,14 @@
 package org.apache.beam.runners.core;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
-import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
@@ -61,12 +60,12 @@ public class BeamFnDataWriteRunner<InputT> {
     this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
     this.outputTarget = outputTarget;
 
+    MessageWithComponents runnerApiCoder =
+        coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
     @SuppressWarnings("unchecked")
-    Coder<WindowedValue<InputT>> coder = Serializer.deserialize(
-        OBJECT_MAPPER.readValue(
-            coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
-            Map.class),
-        Coder.class);
+    Coder<WindowedValue<InputT>> coder =
+        (Coder<WindowedValue<InputT>>)
+            Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
     this.coder = coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index bd2fba9..5987267 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -62,6 +62,7 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -106,18 +107,23 @@ public class ProcessBundleHandlerTest {
   static {
     try {
       STRING_CODER_SPEC =
-          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setId(STRING_CODER_SPEC_ID)
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(STRING_CODER.asCloudObject()))).build())))
-          .build();
+          BeamFnApi.Coder.newBuilder()
+              .setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder()
+                      .setId(STRING_CODER_SPEC_ID)
+                      .setData(Any.pack(Coders.toProto(STRING_CODER))))
+              .build();
       LONG_CODER_SPEC =
-          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setId(STRING_CODER_SPEC_ID)
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(WindowedValue.getFullCoder(
-                  VarLongCoder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject()))).build())))
-          .build();
+          BeamFnApi.Coder.newBuilder()
+              .setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder()
+                      .setId(STRING_CODER_SPEC_ID)
+                      .setData(
+                          Any.pack(
+                              Coders.toProto(
+                                  WindowedValue.getFullCoder(
+                                      VarLongCoder.of(), GlobalWindow.Coder.INSTANCE)))))
+              .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index 0cc5ef9..0d036fe 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -32,8 +32,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -48,6 +46,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.test.TestExecutors;
 import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -76,10 +75,11 @@ public class BeamFnDataReadRunnerTest {
   private static final BeamFnApi.Coder CODER_SPEC;
   static {
     try {
-      CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build())))
-          .build();
+      CODER_SPEC =
+          BeamFnApi.Coder.newBuilder()
+              .setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
+              .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index 378567a..50fee7a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -30,14 +30,13 @@ import static org.mockito.Mockito.when;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -64,10 +63,11 @@ public class BeamFnDataWriteRunnerTest {
   private static final BeamFnApi.Coder CODER_SPEC;
   static {
     try {
-      CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-      .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-          OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build())))
-      .build();
+      CODER_SPEC =
+          BeamFnApi.Coder.newBuilder()
+              .setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
+              .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }


[3/3] beam git commit: Java-Serialize Coders Directly

Posted by tg...@apache.org.
Java-Serialize Coders Directly

Known Coders will be serialized via a known path. Unknown
Coders will be serialized as a java object.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed0b63a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed0b63a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed0b63a4

Branch: refs/heads/master
Commit: ed0b63a4c1c55d58b94ab7574c14d221486f434a
Parents: 8cc4d59
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 18:06:13 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |  5 ----
 .../beam/runners/core/construction/Coders.java  | 30 +++++++++++---------
 .../runners/core/construction/CodersTest.java   |  3 +-
 3 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 854fdc1..b38e602 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -70,11 +70,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 8793df4..094f21f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
@@ -44,18 +43,15 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 
 /** Converts to and from Beam Runner API representations of {@link Coder Coders}. */
 public class Coders {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
   // This URN says that the coder is just a UDF blob this SDK understands
   // TODO: standardize such things
-  public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+  public static final String JAVA_SERIALIZED_CODER_URN = "urn:beam:coders:javasdk:0.1";
 
   // The URNs for coders which are shared across languages
   @VisibleForTesting
@@ -71,6 +67,15 @@ public class Coders {
           .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
           .build();
 
+  public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException {
+    SdkComponents components = SdkComponents.create();
+    RunnerApi.Coder coderProto = toProto(coder, components);
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setCoder(coderProto)
+        .setComponents(components.toComponents())
+        .build();
+  }
+
   public static RunnerApi.Coder toProto(
       Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) throws IOException {
     if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
@@ -108,13 +113,13 @@ public class Coders {
             SdkFunctionSpec.newBuilder()
                 .setSpec(
                     FunctionSpec.newBuilder()
-                        .setUrn(CUSTOM_CODER_URN)
+                        .setUrn(JAVA_SERIALIZED_CODER_URN)
                         .setParameter(
                             Any.pack(
                                 BytesValue.newBuilder()
                                     .setValue(
                                         ByteString.copyFrom(
-                                            OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject())))
+                                            SerializableUtils.serializeToByteArray(coder)))
                                     .build()))))
         .build();
   }
@@ -122,7 +127,7 @@ public class Coders {
   public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components components)
       throws IOException {
     String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
-    if (coderSpecUrn.equals(CUSTOM_CODER_URN)) {
+    if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) {
       return fromCustomCoder(protoCoder, components);
     }
     return fromKnownCoder(protoCoder, components);
@@ -165,8 +170,8 @@ public class Coders {
   private static Coder<?> fromCustomCoder(
       RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components)
       throws IOException {
-    CloudObject coderCloudObject =
-        OBJECT_MAPPER.readValue(
+    return (Coder<?>)
+        SerializableUtils.deserializeFromByteArray(
             protoCoder
                 .getSpec()
                 .getSpec()
@@ -174,7 +179,6 @@ public class Coders {
                 .unpack(BytesValue.class)
                 .getValue()
                 .toByteArray(),
-            CloudObject.class);
-    return Serializer.deserialize(coderCloudObject, Coder.class);
+            protoCoder.getSpec().getSpec().getUrn());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index ca0fdc9..ecd0fa5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -133,7 +133,8 @@ public class CodersTest {
       if (KNOWN_CODERS.contains(coder)) {
         for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) {
           assertThat(
-              encodedCoder.getSpec().getSpec().getUrn(), not(equalTo(Coders.CUSTOM_CODER_URN)));
+              encodedCoder.getSpec().getSpec().getUrn(),
+              not(equalTo(Coders.JAVA_SERIALIZED_CODER_URN)));
         }
       }
     }