You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/29 03:43:48 UTC

[2/3] beam git commit: Use Runner API Encodings in the Fn API

Use Runner API Encodings in the Fn API

Removes uses of Coder.toCloudObject


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2b5d6ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2b5d6ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2b5d6ea

Branch: refs/heads/master
Commit: e2b5d6ea8a5d41ac27245a4999f59a73dfe24c43
Parents: ed0b63a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 18:06:38 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/BeamFnDataReadRunner.java | 14 +++++-----
 .../runners/core/BeamFnDataWriteRunner.java     | 15 +++++------
 .../control/ProcessBundleHandlerTest.java       | 28 ++++++++++++--------
 .../runners/core/BeamFnDataReadRunnerTest.java  | 12 ++++-----
 .../runners/core/BeamFnDataWriteRunnerTest.java | 12 ++++-----
 5 files changed, 43 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index 034ef84..805d480 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -30,8 +29,9 @@ import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
@@ -73,12 +73,12 @@ public class BeamFnDataReadRunner<OutputT> {
     this.beamFnDataClientFactory = beamFnDataClientFactory;
     this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values()));
 
+    MessageWithComponents runnerApiCoder =
+        coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
     @SuppressWarnings("unchecked")
-    Coder<WindowedValue<OutputT>> coder = Serializer.deserialize(
-        OBJECT_MAPPER.readValue(
-            coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
-            Map.class),
-        Coder.class);
+    Coder<WindowedValue<OutputT>> coder =
+        (Coder<WindowedValue<OutputT>>)
+            Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
     this.coder = coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index 54fd626..0ba09e3 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -19,15 +19,14 @@
 package org.apache.beam.runners.core;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
-import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
@@ -61,12 +60,12 @@ public class BeamFnDataWriteRunner<InputT> {
     this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
     this.outputTarget = outputTarget;
 
+    MessageWithComponents runnerApiCoder =
+        coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
     @SuppressWarnings("unchecked")
-    Coder<WindowedValue<InputT>> coder = Serializer.deserialize(
-        OBJECT_MAPPER.readValue(
-            coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
-            Map.class),
-        Coder.class);
+    Coder<WindowedValue<InputT>> coder =
+        (Coder<WindowedValue<InputT>>)
+            Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
     this.coder = coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index bd2fba9..5987267 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -62,6 +62,7 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -106,18 +107,23 @@ public class ProcessBundleHandlerTest {
   static {
     try {
       STRING_CODER_SPEC =
-          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setId(STRING_CODER_SPEC_ID)
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(STRING_CODER.asCloudObject()))).build())))
-          .build();
+          BeamFnApi.Coder.newBuilder()
+              .setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder()
+                      .setId(STRING_CODER_SPEC_ID)
+                      .setData(Any.pack(Coders.toProto(STRING_CODER))))
+              .build();
       LONG_CODER_SPEC =
-          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setId(STRING_CODER_SPEC_ID)
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(WindowedValue.getFullCoder(
-                  VarLongCoder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject()))).build())))
-          .build();
+          BeamFnApi.Coder.newBuilder()
+              .setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder()
+                      .setId(STRING_CODER_SPEC_ID)
+                      .setData(
+                          Any.pack(
+                              Coders.toProto(
+                                  WindowedValue.getFullCoder(
+                                      VarLongCoder.of(), GlobalWindow.Coder.INSTANCE)))))
+              .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index 0cc5ef9..0d036fe 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -32,8 +32,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -48,6 +46,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.test.TestExecutors;
 import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -76,10 +75,11 @@ public class BeamFnDataReadRunnerTest {
   private static final BeamFnApi.Coder CODER_SPEC;
   static {
     try {
-      CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-              OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build())))
-          .build();
+      CODER_SPEC =
+          BeamFnApi.Coder.newBuilder()
+              .setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
+              .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index 378567a..50fee7a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -30,14 +30,13 @@ import static org.mockito.Mockito.when;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -64,10 +63,11 @@ public class BeamFnDataWriteRunnerTest {
   private static final BeamFnApi.Coder CODER_SPEC;
   static {
     try {
-      CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
-      .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-          OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build())))
-      .build();
+      CODER_SPEC =
+          BeamFnApi.Coder.newBuilder()
+              .setFunctionSpec(
+                  BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
+              .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }