You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/08/01 21:22:41 UTC
[2/2] beam git commit: Remove References to CloudObject from the Java
Harness
Remove References to CloudObject from the Java Harness
Migrates to using the shared Runner API definitions.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/64cf18fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/64cf18fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/64cf18fc
Branch: refs/heads/master
Commit: 64cf18fcdb4237189a5212b6476bdadf73a2ac7f
Parents: 3c81766
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jul 26 15:34:23 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Aug 1 14:22:21 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/BeamFnDataReadRunner.java | 27 ++++++++----------
.../beam/fn/harness/BeamFnDataWriteRunner.java | 22 ++++++---------
.../fn/harness/BeamFnDataReadRunnerTest.java | 28 +++++++++----------
.../fn/harness/BeamFnDataWriteRunnerTest.java | 24 ++++++----------
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 29 --------------------
5 files changed, 41 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index e2c17b0..1e611db 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@@ -35,8 +34,8 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -91,8 +90,9 @@ public class BeamFnDataReadRunner<OutputT> {
.setPrimitiveTransformReference(pTransformId)
.setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
.build();
- RunnerApi.Coder coderSpec = coders.get(pCollections.get(
- getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+ RunnerApi.Coder coderSpec =
+ coders.get(
+ pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers =
(Collection) pCollectionIdsToConsumers.get(
getOnlyElement(pTransform.getOutputsMap().values()));
@@ -102,6 +102,7 @@ public class BeamFnDataReadRunner<OutputT> {
processBundleInstructionId,
target,
coderSpec,
+ coders,
beamFnDataClient,
consumers);
addStartFunction.accept(runner::registerInputLocation);
@@ -124,6 +125,7 @@ public class BeamFnDataReadRunner<OutputT> {
Supplier<String> processBundleInstructionIdSupplier,
BeamFnApi.Target inputTarget,
RunnerApi.Coder coderSpec,
+ Map<String, RunnerApi.Coder> coders,
BeamFnDataClient beamFnDataClientFactory,
Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers)
throws IOException {
@@ -137,17 +139,10 @@ public class BeamFnDataReadRunner<OutputT> {
@SuppressWarnings("unchecked")
Coder<WindowedValue<OutputT>> coder =
(Coder<WindowedValue<OutputT>>)
- CloudObjects.coderFromCloudObject(
- CloudObject.fromSpec(
- OBJECT_MAPPER.readValue(
- coderSpec
- .getSpec()
- .getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
- .newInput(),
- Map.class)));
+ CoderTranslation.fromProto(
+ coderSpec,
+ RehydratedComponents.forComponents(
+ RunnerApi.Components.newBuilder().putAllCoders(coders).build()));
this.coder = coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index eec4dfd..bbed753 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
@@ -34,8 +33,8 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -93,6 +92,7 @@ public class BeamFnDataWriteRunner<InputT> {
processBundleInstructionId,
target,
coderSpec,
+ coders,
beamFnDataClient);
addStartFunction.accept(runner::registerForOutput);
pCollectionIdsToConsumers.put(
@@ -117,6 +117,7 @@ public class BeamFnDataWriteRunner<InputT> {
Supplier<String> processBundleInstructionIdSupplier,
BeamFnApi.Target outputTarget,
RunnerApi.Coder coderSpec,
+ Map<String, RunnerApi.Coder> coders,
BeamFnDataClient beamFnDataClientFactory)
throws IOException {
this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
@@ -128,17 +129,10 @@ public class BeamFnDataWriteRunner<InputT> {
@SuppressWarnings("unchecked")
Coder<WindowedValue<InputT>> coder =
(Coder<WindowedValue<InputT>>)
- CloudObjects.coderFromCloudObject(
- CloudObject.fromSpec(
- OBJECT_MAPPER.readValue(
- coderSpec
- .getSpec()
- .getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
- .newInput(),
- Map.class)));
+ CoderTranslation.fromProto(
+ coderSpec,
+ RehydratedComponents.forComponents(
+ RunnerApi.Components.newBuilder().putAllCoders(coders).build()));
this.coder = coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index a7c6666..d712f5f 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
@@ -39,8 +38,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -56,10 +53,11 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.harness.test.TestExecutors;
import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -79,7 +77,6 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class BeamFnDataReadRunnerTest {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
@@ -88,19 +85,19 @@ public class BeamFnDataReadRunnerTest {
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
private static final String CODER_SPEC_ID = "string-coder-id";
private static final RunnerApi.Coder CODER_SPEC;
+ private static final RunnerApi.Components COMPONENTS;
private static final String URN = "urn:org.apache.beam:source:runner:0.1";
static {
try {
- CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec(
- RunnerApi.SdkFunctionSpec.newBuilder().setSpec(
- RunnerApi.FunctionSpec.newBuilder().setParameter(
- Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER))))
- .build()))
- .build())
- .build())
- .build();
+ MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER);
+ CODER_SPEC = coderAndComponents.getCoder();
+ COMPONENTS =
+ coderAndComponents
+ .getComponents()
+ .toBuilder()
+ .putCoders(CODER_SPEC_ID, CODER_SPEC)
+ .build();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
@@ -150,7 +147,7 @@ public class BeamFnDataReadRunnerTest {
Suppliers.ofInstance(bundleId)::get,
ImmutableMap.of("outputPC",
RunnerApi.PCollection.newBuilder().setCoderId(CODER_SPEC_ID).build()),
- ImmutableMap.of(CODER_SPEC_ID, CODER_SPEC),
+ COMPONENTS.getCodersMap(),
consumers,
startFunctions::add,
finishFunctions::add);
@@ -200,6 +197,7 @@ public class BeamFnDataReadRunnerTest {
bundleId::get,
INPUT_TARGET,
CODER_SPEC,
+ COMPONENTS.getCodersMap(),
mockBeamFnDataClient,
ImmutableList.of(valuesA::add, valuesB::add));
http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index 28838b1..0caf19e 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -32,15 +32,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -53,10 +50,11 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -74,7 +72,6 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class BeamFnDataWriteRunnerTest {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
@@ -83,19 +80,15 @@ public class BeamFnDataWriteRunnerTest {
private static final Coder<WindowedValue<String>> CODER =
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
private static final RunnerApi.Coder CODER_SPEC;
+ private static final RunnerApi.Components COMPONENTS;
private static final String URN = "urn:org.apache.beam:sink:runner:0.1";
static {
try {
- CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec(
- RunnerApi.SdkFunctionSpec.newBuilder().setSpec(
- RunnerApi.FunctionSpec.newBuilder().setParameter(
- Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER))))
- .build()))
- .build())
- .build())
- .build();
+ MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER);
+ CODER_SPEC = coderAndComponents.getCoder();
+ COMPONENTS =
+ coderAndComponents.getComponents().toBuilder().putCoders(CODER_ID, CODER_SPEC).build();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
@@ -140,7 +133,7 @@ public class BeamFnDataWriteRunnerTest {
Suppliers.ofInstance(bundleId)::get,
ImmutableMap.of("inputPC",
RunnerApi.PCollection.newBuilder().setCoderId(CODER_ID).build()),
- ImmutableMap.of(CODER_ID, CODER_SPEC),
+ COMPONENTS.getCodersMap(),
consumers,
startFunctions::add,
finishFunctions::add);
@@ -201,6 +194,7 @@ public class BeamFnDataWriteRunnerTest {
bundleId::get,
OUTPUT_TARGET,
CODER_SPEC,
+ COMPONENTS.getCodersMap(),
mockBeamFnDataClient);
// Process for bundle id 0
http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 98362a2..e269bcc 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
@@ -35,19 +34,14 @@ import com.google.common.collect.Multimap;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
-import com.google.protobuf.Message;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.ServiceLoader;
import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -66,28 +60,6 @@ import org.junit.runners.JUnit4;
/** Tests for {@link FnApiDoFnRunner}. */
@RunWith(JUnit4.class)
public class FnApiDoFnRunnerTest {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final Coder<WindowedValue<String>> STRING_CODER =
- WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
- private static final String STRING_CODER_SPEC_ID = "999L";
- private static final RunnerApi.Coder STRING_CODER_SPEC;
-
- static {
- try {
- STRING_CODER_SPEC = RunnerApi.Coder.newBuilder()
- .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
- .setSpec(RunnerApi.FunctionSpec.newBuilder()
- .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER))))
- .build())))
- .build())
- .build();
- } catch (IOException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
private static class TestDoFn extends DoFn<String, String> {
private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
@@ -117,7 +89,6 @@ public class FnApiDoFnRunnerTest {
*/
@Test
public void testCreatingAndProcessingDoFn() throws Exception {
- Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
String pTransformId = "pTransformId";
String mainOutputId = "101";
String additionalOutputId = "102";