You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 17:28:13 UTC

beam git commit: Use CloudObject encodings in the Beam Fn Harness

Repository: beam
Updated Branches:
  refs/heads/master f8ae1185c -> 48c8ed176


Use CloudObject encodings in the Beam Fn Harness

This mostly reverts commit e2b5d6ea8a5d41ac27245a4999f59a73dfe24c43.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48c8ed17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48c8ed17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48c8ed17

Branch: refs/heads/master
Commit: 48c8ed17623b3f36b7aeebb2e2ca585a259d2fec
Parents: f8ae118
Author: Thomas Groh <tg...@google.com>
Authored: Thu May 4 09:08:28 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 10:28:03 2017 -0700

----------------------------------------------------------------------
 sdks/java/harness/pom.xml                       | 10 ------
 .../beam/runners/core/BeamFnDataReadRunner.java | 21 ++++++++++---
 .../runners/core/BeamFnDataWriteRunner.java     | 22 +++++++++++---
 .../control/ProcessBundleHandlerTest.java       | 32 +++++++++-----------
 .../runners/core/BeamFnDataReadRunnerTest.java  | 16 ++++++----
 .../runners/core/BeamFnDataWriteRunnerTest.java | 16 ++++++----
 6 files changed, 68 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 73f08cc..d00dfe4 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -65,16 +65,6 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-construction-java</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-runner-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
       <version>${project.version}</version>
       <classifier>tests</classifier>

http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index 7c4a5e8..e6928d1 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -18,8 +18,10 @@
 
 package org.apache.beam.runners.core;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -28,9 +30,9 @@ import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
@@ -47,6 +49,8 @@ import org.slf4j.LoggerFactory;
 public class BeamFnDataReadRunner<OutputT> {
   private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
 
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
   private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
   private final Supplier<String> processBundleInstructionIdSupplier;
@@ -71,12 +75,19 @@ public class BeamFnDataReadRunner<OutputT> {
     this.beamFnDataClientFactory = beamFnDataClientFactory;
     this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values()));
 
-    MessageWithComponents runnerApiCoder =
-        coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
     @SuppressWarnings("unchecked")
     Coder<WindowedValue<OutputT>> coder =
         (Coder<WindowedValue<OutputT>>)
-            Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
+            CloudObjects.coderFromCloudObject(
+                CloudObject.fromSpec(
+                    OBJECT_MAPPER.readValue(
+                        coderSpec
+                            .getFunctionSpec()
+                            .getData()
+                            .unpack(BytesValue.class)
+                            .getValue()
+                            .newInput(),
+                        Map.class)));
     this.coder = coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index 3a11def..a78da5d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -18,14 +18,17 @@
 
 package org.apache.beam.runners.core;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.BytesValue;
 import java.io.IOException;
+import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
@@ -37,6 +40,8 @@ import org.apache.beam.sdk.values.KV;
  * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
  */
 public class BeamFnDataWriteRunner<InputT> {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
   private final BeamFnApi.Target outputTarget;
   private final Coder<WindowedValue<InputT>> coder;
@@ -58,12 +63,19 @@ public class BeamFnDataWriteRunner<InputT> {
     this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
     this.outputTarget = outputTarget;
 
-    MessageWithComponents runnerApiCoder =
-        coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
     @SuppressWarnings("unchecked")
     Coder<WindowedValue<InputT>> coder =
         (Coder<WindowedValue<InputT>>)
-            Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
+            CloudObjects.coderFromCloudObject(
+                CloudObject.fromSpec(
+                    OBJECT_MAPPER.readValue(
+                        coderSpec
+                            .getFunctionSpec()
+                            .getData()
+                            .unpack(BytesValue.class)
+                            .getValue()
+                            .newInput(),
+                        Map.class)));
     this.coder = coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 654f989..1cdd087 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
@@ -61,7 +62,7 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -91,6 +92,8 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link ProcessBundleHandler}. */
 @RunWith(JUnit4.class)
 public class ProcessBundleHandlerTest {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   private static final Coder<WindowedValue<String>> STRING_CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
   private static final String LONG_CODER_SPEC_ID = "998L";
@@ -105,23 +108,18 @@ public class ProcessBundleHandlerTest {
   static {
     try {
       STRING_CODER_SPEC =
-          BeamFnApi.Coder.newBuilder()
-              .setFunctionSpec(
-                  BeamFnApi.FunctionSpec.newBuilder()
-                      .setId(STRING_CODER_SPEC_ID)
-                      .setData(Any.pack(Coders.toProto(STRING_CODER))))
-              .build();
+          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+          .setId(STRING_CODER_SPEC_ID)
+          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+              OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER)))).build())))
+          .build();
       LONG_CODER_SPEC =
-          BeamFnApi.Coder.newBuilder()
-              .setFunctionSpec(
-                  BeamFnApi.FunctionSpec.newBuilder()
-                      .setId(STRING_CODER_SPEC_ID)
-                      .setData(
-                          Any.pack(
-                              Coders.toProto(
-                                  WindowedValue.getFullCoder(
-                                      VarLongCoder.of(), GlobalWindow.Coder.INSTANCE)))))
-              .build();
+          BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+          .setId(STRING_CODER_SPEC_ID)
+          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+              OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(WindowedValue.getFullCoder(
+                  VarLongCoder.of(), GlobalWindow.Coder.INSTANCE))))).build())))
+          .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index 04a3615..a3d4a1b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -27,10 +27,13 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,7 +48,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.test.TestExecutors;
 import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -64,6 +67,8 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link BeamFnDataReadRunner}. */
 @RunWith(JUnit4.class)
 public class BeamFnDataReadRunnerTest {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
@@ -73,11 +78,10 @@ public class BeamFnDataReadRunnerTest {
   private static final BeamFnApi.Coder CODER_SPEC;
   static {
     try {
-      CODER_SPEC =
-          BeamFnApi.Coder.newBuilder()
-              .setFunctionSpec(
-                  BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
-              .build();
+      CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+          .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+              OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))).build())))
+          .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index 9e50cd0..3383966 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -28,14 +28,17 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -52,6 +55,8 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link BeamFnDataWriteRunner}. */
 @RunWith(JUnit4.class)
 public class BeamFnDataWriteRunnerTest {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
@@ -61,11 +66,10 @@ public class BeamFnDataWriteRunnerTest {
   private static final BeamFnApi.Coder CODER_SPEC;
   static {
     try {
-      CODER_SPEC =
-          BeamFnApi.Coder.newBuilder()
-              .setFunctionSpec(
-                  BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
-              .build();
+      CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+      .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+          OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))).build())))
+      .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }