You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 17:28:13 UTC
beam git commit: Use CloudObject encodings in the Beam Fn Harness
Repository: beam
Updated Branches:
refs/heads/master f8ae1185c -> 48c8ed176
Use CloudObject encodings in the Beam Fn Harness
This mostly reverts commit e2b5d6ea8a5d41ac27245a4999f59a73dfe24c43.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48c8ed17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48c8ed17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48c8ed17
Branch: refs/heads/master
Commit: 48c8ed17623b3f36b7aeebb2e2ca585a259d2fec
Parents: f8ae118
Author: Thomas Groh <tg...@google.com>
Authored: Thu May 4 09:08:28 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 10:28:03 2017 -0700
----------------------------------------------------------------------
sdks/java/harness/pom.xml | 10 ------
.../beam/runners/core/BeamFnDataReadRunner.java | 21 ++++++++++---
.../runners/core/BeamFnDataWriteRunner.java | 22 +++++++++++---
.../control/ProcessBundleHandlerTest.java | 32 +++++++++-----------
.../runners/core/BeamFnDataReadRunnerTest.java | 16 ++++++----
.../runners/core/BeamFnDataWriteRunnerTest.java | 16 ++++++----
6 files changed, 68 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 73f08cc..d00dfe4 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -65,16 +65,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-construction-java</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-common-runner-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index 7c4a5e8..e6928d1 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -18,8 +18,10 @@
package org.apache.beam.runners.core;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
+import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@@ -28,9 +30,9 @@ import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
@@ -47,6 +49,8 @@ import org.slf4j.LoggerFactory;
public class BeamFnDataReadRunner<OutputT> {
private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
private final Supplier<String> processBundleInstructionIdSupplier;
@@ -71,12 +75,19 @@ public class BeamFnDataReadRunner<OutputT> {
this.beamFnDataClientFactory = beamFnDataClientFactory;
this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values()));
- MessageWithComponents runnerApiCoder =
- coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
@SuppressWarnings("unchecked")
Coder<WindowedValue<OutputT>> coder =
(Coder<WindowedValue<OutputT>>)
- Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
+ CloudObjects.coderFromCloudObject(
+ CloudObject.fromSpec(
+ OBJECT_MAPPER.readValue(
+ coderSpec
+ .getFunctionSpec()
+ .getData()
+ .unpack(BytesValue.class)
+ .getValue()
+ .newInput(),
+ Map.class)));
this.coder = coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index 3a11def..a78da5d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -18,14 +18,17 @@
package org.apache.beam.runners.core;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.BytesValue;
import java.io.IOException;
+import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -37,6 +40,8 @@ import org.apache.beam.sdk.values.KV;
* For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
*/
public class BeamFnDataWriteRunner<InputT> {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
private final BeamFnApi.Target outputTarget;
private final Coder<WindowedValue<InputT>> coder;
@@ -58,12 +63,19 @@ public class BeamFnDataWriteRunner<InputT> {
this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
this.outputTarget = outputTarget;
- MessageWithComponents runnerApiCoder =
- coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
@SuppressWarnings("unchecked")
Coder<WindowedValue<InputT>> coder =
(Coder<WindowedValue<InputT>>)
- Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
+ CloudObjects.coderFromCloudObject(
+ CloudObject.fromSpec(
+ OBJECT_MAPPER.readValue(
+ coderSpec
+ .getFunctionSpec()
+ .getData()
+ .unpack(BytesValue.class)
+ .getValue()
+ .newInput(),
+ Map.class)));
this.coder = coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 654f989..1cdd087 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
@@ -61,7 +62,7 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -91,6 +92,8 @@ import org.mockito.MockitoAnnotations;
/** Tests for {@link ProcessBundleHandler}. */
@RunWith(JUnit4.class)
public class ProcessBundleHandlerTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private static final Coder<WindowedValue<String>> STRING_CODER =
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
private static final String LONG_CODER_SPEC_ID = "998L";
@@ -105,23 +108,18 @@ public class ProcessBundleHandlerTest {
static {
try {
STRING_CODER_SPEC =
- BeamFnApi.Coder.newBuilder()
- .setFunctionSpec(
- BeamFnApi.FunctionSpec.newBuilder()
- .setId(STRING_CODER_SPEC_ID)
- .setData(Any.pack(Coders.toProto(STRING_CODER))))
- .build();
+ BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+ .setId(STRING_CODER_SPEC_ID)
+ .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+ OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER)))).build())))
+ .build();
LONG_CODER_SPEC =
- BeamFnApi.Coder.newBuilder()
- .setFunctionSpec(
- BeamFnApi.FunctionSpec.newBuilder()
- .setId(STRING_CODER_SPEC_ID)
- .setData(
- Any.pack(
- Coders.toProto(
- WindowedValue.getFullCoder(
- VarLongCoder.of(), GlobalWindow.Coder.INSTANCE)))))
- .build();
+ BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+ .setId(STRING_CODER_SPEC_ID)
+ .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+ OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(WindowedValue.getFullCoder(
+ VarLongCoder.of(), GlobalWindow.Coder.INSTANCE))))).build())))
+ .build();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index 04a3615..a3d4a1b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -27,10 +27,13 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -45,7 +48,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.test.TestExecutors;
import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -64,6 +67,8 @@ import org.mockito.MockitoAnnotations;
/** Tests for {@link BeamFnDataReadRunner}. */
@RunWith(JUnit4.class)
public class BeamFnDataReadRunnerTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
@@ -73,11 +78,10 @@ public class BeamFnDataReadRunnerTest {
private static final BeamFnApi.Coder CODER_SPEC;
static {
try {
- CODER_SPEC =
- BeamFnApi.Coder.newBuilder()
- .setFunctionSpec(
- BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
- .build();
+ CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+ .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+ OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))).build())))
+ .build();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/48c8ed17/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index 9e50cd0..3383966 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -28,14 +28,17 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.construction.Coders;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -52,6 +55,8 @@ import org.mockito.MockitoAnnotations;
/** Tests for {@link BeamFnDataWriteRunner}. */
@RunWith(JUnit4.class)
public class BeamFnDataWriteRunnerTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
private static final BeamFnApi.FunctionSpec FUNCTION_SPEC = BeamFnApi.FunctionSpec.newBuilder()
@@ -61,11 +66,10 @@ public class BeamFnDataWriteRunnerTest {
private static final BeamFnApi.Coder CODER_SPEC;
static {
try {
- CODER_SPEC =
- BeamFnApi.Coder.newBuilder()
- .setFunctionSpec(
- BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
- .build();
+ CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
+ .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
+ OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))).build())))
+ .build();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}