You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/18 20:05:36 UTC
[3/4] beam git commit: Fix split package in SDK harness
Fix split package in SDK harness
The Java SDK harness defined classes both in its own namespace
org.apache.beam.fn.harness and the org.apache.beam.runners.core namespace,
resulting in a split package across multiple jars.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b4700f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b4700f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b4700f
Branch: refs/heads/master
Commit: f1b4700f32c5ea39559145d6f5db3909439f6c80
Parents: 7e4719c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 17 13:46:46 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 17 13:46:46 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/BeamFnDataReadRunner.java | 173 ++++++
.../beam/fn/harness/BeamFnDataWriteRunner.java | 159 ++++++
.../beam/fn/harness/BoundedSourceRunner.java | 167 ++++++
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 548 +++++++++++++++++++
.../fn/harness/PTransformRunnerFactory.java | 81 +++
.../harness/control/ProcessBundleHandler.java | 4 +-
.../beam/runners/core/BeamFnDataReadRunner.java | 173 ------
.../runners/core/BeamFnDataWriteRunner.java | 159 ------
.../beam/runners/core/BoundedSourceRunner.java | 167 ------
.../beam/runners/core/FnApiDoFnRunner.java | 547 ------------------
.../runners/core/PTransformRunnerFactory.java | 81 ---
.../apache/beam/runners/core/package-info.java | 22 -
.../fn/harness/BeamFnDataReadRunnerTest.java | 281 ++++++++++
.../fn/harness/BeamFnDataWriteRunnerTest.java | 269 +++++++++
.../fn/harness/BoundedSourceRunnerTest.java | 187 +++++++
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 210 +++++++
.../control/ProcessBundleHandlerTest.java | 2 +-
.../runners/core/BeamFnDataReadRunnerTest.java | 281 ----------
.../runners/core/BeamFnDataWriteRunnerTest.java | 269 ---------
.../runners/core/BoundedSourceRunnerTest.java | 187 -------
.../beam/runners/core/FnApiDoFnRunnerTest.java | 210 -------
21 files changed, 2078 insertions(+), 2099 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
new file mode 100644
index 0000000..e2c17b0
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registers as a consumer for data over the Beam Fn API. Multiplexes any received data
+ * to all consumers in the specified output map.
+ *
+ * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
+ * For each request, call {@link #registerInputLocation()} to start and call
+ * {@link #blockTillReadFinishes()} to finish.
+ */
+public class BeamFnDataReadRunner<OutputT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String URN = "urn:org.apache.beam:source:runner:0.1";
+
+ /** A registrar which provides a factory to handle reading from the Fn Api Data Plane. */
+ @AutoService(PTransformRunnerFactory.Registrar.class)
+ public static class Registrar implements
+ PTransformRunnerFactory.Registrar {
+
+ @Override
+ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+ return ImmutableMap.of(URN, new Factory());
+ }
+ }
+
+ /** A factory for {@link BeamFnDataReadRunner}s. */
+ static class Factory<OutputT>
+ implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
+
+ @Override
+ public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(
+ PipelineOptions pipelineOptions,
+ BeamFnDataClient beamFnDataClient,
+ String pTransformId,
+ RunnerApi.PTransform pTransform,
+ Supplier<String> processBundleInstructionId,
+ Map<String, RunnerApi.PCollection> pCollections,
+ Map<String, RunnerApi.Coder> coders,
+ Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+ Consumer<ThrowingRunnable> addStartFunction,
+ Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+
+ BeamFnApi.Target target = BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(pTransformId)
+ .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
+ .build();
+ RunnerApi.Coder coderSpec = coders.get(pCollections.get(
+ getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+ Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers =
+ (Collection) pCollectionIdsToConsumers.get(
+ getOnlyElement(pTransform.getOutputsMap().values()));
+
+ BeamFnDataReadRunner<OutputT> runner = new BeamFnDataReadRunner<>(
+ pTransform.getSpec(),
+ processBundleInstructionId,
+ target,
+ coderSpec,
+ beamFnDataClient,
+ consumers);
+ addStartFunction.accept(runner::registerInputLocation);
+ addFinishFunction.accept(runner::blockTillReadFinishes);
+ return runner;
+ }
+ }
+
+ private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+ private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
+ private final Supplier<String> processBundleInstructionIdSupplier;
+ private final BeamFnDataClient beamFnDataClientFactory;
+ private final Coder<WindowedValue<OutputT>> coder;
+ private final BeamFnApi.Target inputTarget;
+
+ private CompletableFuture<Void> readFuture;
+
+ BeamFnDataReadRunner(
+ RunnerApi.FunctionSpec functionSpec,
+ Supplier<String> processBundleInstructionIdSupplier,
+ BeamFnApi.Target inputTarget,
+ RunnerApi.Coder coderSpec,
+ BeamFnDataClient beamFnDataClientFactory,
+ Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers)
+ throws IOException {
+ this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
+ .getApiServiceDescriptor();
+ this.inputTarget = inputTarget;
+ this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
+ this.beamFnDataClientFactory = beamFnDataClientFactory;
+ this.consumers = consumers;
+
+ @SuppressWarnings("unchecked")
+ Coder<WindowedValue<OutputT>> coder =
+ (Coder<WindowedValue<OutputT>>)
+ CloudObjects.coderFromCloudObject(
+ CloudObject.fromSpec(
+ OBJECT_MAPPER.readValue(
+ coderSpec
+ .getSpec()
+ .getSpec()
+ .getParameter()
+ .unpack(BytesValue.class)
+ .getValue()
+ .newInput(),
+ Map.class)));
+ this.coder = coder;
+ }
+
+ public void registerInputLocation() {
+ this.readFuture = beamFnDataClientFactory.forInboundConsumer(
+ apiServiceDescriptor,
+ KV.of(processBundleInstructionIdSupplier.get(), inputTarget),
+ coder,
+ this::multiplexToConsumers);
+ }
+
+ public void blockTillReadFinishes() throws Exception {
+ LOG.debug("Waiting for process bundle instruction {} and target {} to close.",
+ processBundleInstructionIdSupplier.get(), inputTarget);
+ readFuture.get();
+ }
+
+ private void multiplexToConsumers(WindowedValue<OutputT> value) throws Exception {
+ for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
+ consumer.accept(value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
new file mode 100644
index 0000000..eec4dfd
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Registers as a consumer with the Beam Fn Data Api. Consumes elements and encodes them for
+ * transmission.
+ *
+ * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
+ * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
+ */
+public class BeamFnDataWriteRunner<InputT> {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String URN = "urn:org.apache.beam:sink:runner:0.1";
+
+ /** A registrar which provides a factory to handle writing to the Fn Api Data Plane. */
+ @AutoService(PTransformRunnerFactory.Registrar.class)
+ public static class Registrar implements
+ PTransformRunnerFactory.Registrar {
+
+ @Override
+ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+ return ImmutableMap.of(URN, new Factory());
+ }
+ }
+
+ /** A factory for {@link BeamFnDataWriteRunner}s. */
+ static class Factory<InputT>
+ implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> {
+
+ @Override
+ public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(
+ PipelineOptions pipelineOptions,
+ BeamFnDataClient beamFnDataClient,
+ String pTransformId,
+ RunnerApi.PTransform pTransform,
+ Supplier<String> processBundleInstructionId,
+ Map<String, RunnerApi.PCollection> pCollections,
+ Map<String, RunnerApi.Coder> coders,
+ Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+ Consumer<ThrowingRunnable> addStartFunction,
+ Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+ BeamFnApi.Target target = BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(pTransformId)
+ .setName(getOnlyElement(pTransform.getInputsMap().keySet()))
+ .build();
+ RunnerApi.Coder coderSpec = coders.get(
+ pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
+ BeamFnDataWriteRunner<InputT> runner =
+ new BeamFnDataWriteRunner<>(
+ pTransform.getSpec(),
+ processBundleInstructionId,
+ target,
+ coderSpec,
+ beamFnDataClient);
+ addStartFunction.accept(runner::registerForOutput);
+ pCollectionIdsToConsumers.put(
+ getOnlyElement(pTransform.getInputsMap().values()),
+ (ThrowingConsumer)
+ (ThrowingConsumer<WindowedValue<InputT>>) runner::consume);
+ addFinishFunction.accept(runner::close);
+ return runner;
+ }
+ }
+
+ private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+ private final BeamFnApi.Target outputTarget;
+ private final Coder<WindowedValue<InputT>> coder;
+ private final BeamFnDataClient beamFnDataClientFactory;
+ private final Supplier<String> processBundleInstructionIdSupplier;
+
+ private CloseableThrowingConsumer<WindowedValue<InputT>> consumer;
+
+ BeamFnDataWriteRunner(
+ RunnerApi.FunctionSpec functionSpec,
+ Supplier<String> processBundleInstructionIdSupplier,
+ BeamFnApi.Target outputTarget,
+ RunnerApi.Coder coderSpec,
+ BeamFnDataClient beamFnDataClientFactory)
+ throws IOException {
+ this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
+ .getApiServiceDescriptor();
+ this.beamFnDataClientFactory = beamFnDataClientFactory;
+ this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
+ this.outputTarget = outputTarget;
+
+ @SuppressWarnings("unchecked")
+ Coder<WindowedValue<InputT>> coder =
+ (Coder<WindowedValue<InputT>>)
+ CloudObjects.coderFromCloudObject(
+ CloudObject.fromSpec(
+ OBJECT_MAPPER.readValue(
+ coderSpec
+ .getSpec()
+ .getSpec()
+ .getParameter()
+ .unpack(BytesValue.class)
+ .getValue()
+ .newInput(),
+ Map.class)));
+ this.coder = coder;
+ }
+
+ public void registerForOutput() {
+ consumer = beamFnDataClientFactory.forOutboundConsumer(
+ apiServiceDescriptor,
+ KV.of(processBundleInstructionIdSupplier.get(), outputTarget),
+ coder);
+ }
+
+ public void close() throws Exception {
+ consumer.close();
+ }
+
+ public void consume(WindowedValue<InputT> value) throws Exception {
+ consumer.accept(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
new file mode 100644
index 0000000..977e803
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A runner which creates {@link Reader}s for each {@link BoundedSource} sent as an input and
+ * executes the {@link Reader}s read loop.
+ */
+public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
+
+ private static final String URN = "urn:org.apache.beam:source:java:0.1";
+
+ /** A registrar which provides a factory to handle Java {@link BoundedSource}s. */
+ @AutoService(PTransformRunnerFactory.Registrar.class)
+ public static class Registrar implements
+ PTransformRunnerFactory.Registrar {
+
+ @Override
+ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+ return ImmutableMap.of(URN, new Factory());
+ }
+ }
+
+ /** A factory for {@link BoundedSourceRunner}. */
+ static class Factory<InputT extends BoundedSource<OutputT>, OutputT>
+ implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> {
+ @Override
+ public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(
+ PipelineOptions pipelineOptions,
+ BeamFnDataClient beamFnDataClient,
+ String pTransformId,
+ RunnerApi.PTransform pTransform,
+ Supplier<String> processBundleInstructionId,
+ Map<String, RunnerApi.PCollection> pCollections,
+ Map<String, RunnerApi.Coder> coders,
+ Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+ Consumer<ThrowingRunnable> addStartFunction,
+ Consumer<ThrowingRunnable> addFinishFunction) {
+
+ ImmutableList.Builder<ThrowingConsumer<WindowedValue<?>>> consumers = ImmutableList.builder();
+ for (String pCollectionId : pTransform.getOutputsMap().values()) {
+ consumers.addAll(pCollectionIdsToConsumers.get(pCollectionId));
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ BoundedSourceRunner<InputT, OutputT> runner = new BoundedSourceRunner(
+ pipelineOptions,
+ pTransform.getSpec(),
+ consumers.build());
+
+ // TODO: Remove and replace with source being sent across gRPC port
+ addStartFunction.accept(runner::start);
+
+ ThrowingConsumer runReadLoop =
+ (ThrowingConsumer<WindowedValue<InputT>>) runner::runReadLoop;
+ for (String pCollectionId : pTransform.getInputsMap().values()) {
+ pCollectionIdsToConsumers.put(
+ pCollectionId,
+ runReadLoop);
+ }
+
+ return runner;
+ }
+ }
+
+ private final PipelineOptions pipelineOptions;
+ private final RunnerApi.FunctionSpec definition;
+ private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
+
+ BoundedSourceRunner(
+ PipelineOptions pipelineOptions,
+ RunnerApi.FunctionSpec definition,
+ Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) {
+ this.pipelineOptions = pipelineOptions;
+ this.definition = definition;
+ this.consumers = consumers;
+ }
+
+ /**
+ * The runner harness is meant to send the source over the Beam Fn Data API which would be
+ * consumed by the {@link #runReadLoop}. Drop this method once the runner harness sends the
+ * source instead of unpacking it from the data block of the function specification.
+ */
+ @Deprecated
+ public void start() throws Exception {
+ try {
+ // The representation here is defined as the java serialized representation of the
+ // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper.
+ byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray();
+ @SuppressWarnings("unchecked")
+ InputT boundedSource =
+ (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
+ runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
+ } catch (InvalidProtocolBufferException e) {
+ throw new IOException(
+ String.format("Failed to decode %s, expected %s",
+ definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
+ e);
+ }
+ }
+
+ /**
+ * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s
+ * read loop. See {@link Reader} for further details of the read loop.
+ *
+ * <p>Propagates any exceptions caused during reading or processing via a consumer to the
+ * caller.
+ */
+ public void runReadLoop(WindowedValue<InputT> value) throws Exception {
+ try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) {
+ if (!reader.start()) {
+ // Reader has no data, immediately return
+ return;
+ }
+ do {
+ // TODO: Should this use the input window as the window for all the outputs?
+ WindowedValue<OutputT> nextValue = WindowedValue.timestampedValueInGlobalWindow(
+ reader.getCurrent(), reader.getCurrentTimestamp());
+ for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
+ consumer.accept(nextValue);
+ }
+ } while (reader.advance());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return definition.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
new file mode 100644
index 0000000..97bd71c
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
+
+/**
+ * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers
+ * of abstraction caused by StateInternals/TimerInternals since they model state and timer
+ * concepts differently.
+ */
+public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+ /**
+ * A registrar which provides a factory to handle Java {@link DoFn}s.
+ */
+ @AutoService(PTransformRunnerFactory.Registrar.class)
+ public static class Registrar implements
+ PTransformRunnerFactory.Registrar {
+
+ @Override
+ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+ return ImmutableMap.of(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN, new Factory());
+ }
+ }
+
+ /** A factory for {@link FnApiDoFnRunner}. */
+ static class Factory<InputT, OutputT>
+ implements PTransformRunnerFactory<DoFnRunner<InputT, OutputT>> {
+
+ @Override
+ public DoFnRunner<InputT, OutputT> createRunnerForPTransform(
+ PipelineOptions pipelineOptions,
+ BeamFnDataClient beamFnDataClient,
+ String pTransformId,
+ RunnerApi.PTransform pTransform,
+ Supplier<String> processBundleInstructionId,
+ Map<String, RunnerApi.PCollection> pCollections,
+ Map<String, RunnerApi.Coder> coders,
+ Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+ Consumer<ThrowingRunnable> addStartFunction,
+ Consumer<ThrowingRunnable> addFinishFunction) {
+
+ // For every output PCollection, create a map from output name to Consumer
+ ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<?>>>>
+ outputMapBuilder = ImmutableMap.builder();
+ for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
+ outputMapBuilder.put(
+ entry.getKey(),
+ pCollectionIdsToConsumers.get(entry.getValue()));
+ }
+ ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<?>>>> outputMap =
+ outputMapBuilder.build();
+
+ // Get the DoFnInfo from the serialized blob.
+ ByteString serializedFn;
+ try {
+ serializedFn = pTransform.getSpec().getParameter().unpack(BytesValue.class).getValue();
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException(
+ String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e);
+ }
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray(
+ serializedFn.toByteArray(), "DoFnInfo");
+
+ // Verify that the DoFnInfo tag to output map matches the output map on the PTransform.
+ checkArgument(
+ Objects.equals(
+ new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)),
+ doFnInfo.getOutputMap().keySet()),
+ "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.",
+ outputMap.keySet(),
+ doFnInfo.getOutputMap());
+
+ ImmutableMultimap.Builder<TupleTag<?>,
+ ThrowingConsumer<WindowedValue<?>>> tagToOutputMapBuilder =
+ ImmutableMultimap.builder();
+ for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Collection<ThrowingConsumer<WindowedValue<?>>> consumers =
+ outputMap.get(Long.toString(entry.getKey()));
+ tagToOutputMapBuilder.putAll(entry.getValue(), consumers);
+ }
+
+ ImmutableMultimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> tagToOutputMap =
+ tagToOutputMapBuilder.build();
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ DoFnRunner<InputT, OutputT> runner = new FnApiDoFnRunner<>(
+ pipelineOptions,
+ doFnInfo.getDoFn(),
+ (Collection<ThrowingConsumer<WindowedValue<OutputT>>>) (Collection)
+ tagToOutputMap.get(doFnInfo.getOutputMap().get(doFnInfo.getMainOutput())),
+ tagToOutputMap,
+ doFnInfo.getWindowingStrategy());
+
+ // Register the appropriate handlers.
+ addStartFunction.accept(runner::startBundle);
+ for (String pcollectionId : pTransform.getInputsMap().values()) {
+ pCollectionIdsToConsumers.put(
+ pcollectionId,
+ (ThrowingConsumer) (ThrowingConsumer<WindowedValue<InputT>>) runner::processElement);
+ }
+ addFinishFunction.accept(runner::finishBundle);
+ return runner;
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ private final PipelineOptions pipelineOptions;
+ private final DoFn<InputT, OutputT> doFn;
+ private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers;
+ private final Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap;
+ private final DoFnInvoker<InputT, OutputT> doFnInvoker;
+ private final StartBundleContext startBundleContext;
+ private final ProcessBundleContext processBundleContext;
+ private final FinishBundleContext finishBundleContext;
+
+ /**
+ * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}.
+ */
+ private WindowedValue<InputT> currentElement;
+
+ /**
+ * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}.
+ */
+ private BoundedWindow currentWindow;
+
+ FnApiDoFnRunner(
+ PipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers,
+ Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap,
+ WindowingStrategy windowingStrategy) {
+ this.pipelineOptions = pipelineOptions;
+ this.doFn = doFn;
+ this.mainOutputConsumers = mainOutputConsumers;
+ this.outputMap = outputMap;
+ this.doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ this.startBundleContext = new StartBundleContext();
+ this.processBundleContext = new ProcessBundleContext();
+ this.finishBundleContext = new FinishBundleContext();
+ }
+
+ @Override
+ public void startBundle() {
+ doFnInvoker.invokeStartBundle(startBundleContext);
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ currentElement = elem;
+ try {
+ Iterator<BoundedWindow> windowIterator =
+ (Iterator<BoundedWindow>) elem.getWindows().iterator();
+ while (windowIterator.hasNext()) {
+ currentWindow = windowIterator.next();
+ doFnInvoker.invokeProcessElement(processBundleContext);
+ }
+ } finally {
+ currentElement = null;
+ currentWindow = null;
+ }
+ }
+
+ @Override
+ public void onTimer(
+ String timerId,
+ BoundedWindow window,
+ Instant timestamp,
+ TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("TODO: Add support for timers");
+ }
+
+ @Override
+ public void finishBundle() {
+ doFnInvoker.invokeFinishBundle(finishBundleContext);
+ }
+
+ /**
+ * Outputs the given element to the specified set of consumers wrapping any exceptions.
+ */
+ private <T> void outputTo(
+ Collection<ThrowingConsumer<WindowedValue<T>>> consumers,
+ WindowedValue<T> output) {
+ Iterator<ThrowingConsumer<WindowedValue<T>>> consumerIterator;
+ try {
+ for (ThrowingConsumer<WindowedValue<T>> consumer : consumers) {
+ consumer.accept(output);
+ }
+ } catch (Throwable t) {
+ throw UserCodeException.wrap(t);
+ }
+ }
+
+ /**
+ * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.StartBundle @StartBundle}.
+ */
+ private class StartBundleContext
+ extends DoFn<InputT, OutputT>.StartBundleContext
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+ private StartBundleContext() {
+ doFn.super();
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new UnsupportedOperationException(
+ "Cannot access window outside of @ProcessElement and @OnTimer methods.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
+ DoFn<InputT, OutputT> doFn) {
+ return this;
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access FinishBundleContext outside of @FinishBundle method.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access ProcessContext outside of @ProcessElement method.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access OnTimerContext outside of @OnTimer methods.");
+ }
+
+ @Override
+ public RestrictionTracker<?> restrictionTracker() {
+ throw new UnsupportedOperationException(
+ "Cannot access RestrictionTracker outside of @ProcessElement method.");
+ }
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException(
+ "Cannot access state outside of @ProcessElement and @OnTimer methods.");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException(
+ "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
+ }
+ }
+
+ /**
+ * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.ProcessElement @ProcessElement}.
+ */
+ private class ProcessBundleContext
+ extends DoFn<InputT, OutputT>.ProcessContext
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+ private ProcessBundleContext() {
+ doFn.super();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return currentWindow;
+ }
+
+ @Override
+ public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access StartBundleContext outside of @StartBundle method.");
+ }
+
+ @Override
+ public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access FinishBundleContext outside of @FinishBundle method.");
+ }
+
+ @Override
+ public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ return this;
+ }
+
+ @Override
+ public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException("TODO: Add support for timers");
+ }
+
+ @Override
+ public RestrictionTracker<?> restrictionTracker() {
+ throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn");
+ }
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException("TODO: Add support for state");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException("TODO: Add support for timers");
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public void output(OutputT output) {
+ outputTo(mainOutputConsumers,
+ WindowedValue.of(
+ output,
+ currentElement.getTimestamp(),
+ currentWindow,
+ currentElement.getPane()));
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ outputTo(mainOutputConsumers,
+ WindowedValue.of(
+ output,
+ timestamp,
+ currentWindow,
+ currentElement.getPane()));
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, T output) {
+ Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag);
+ if (consumers == null) {
+ throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+ }
+ outputTo(consumers,
+ WindowedValue.of(
+ output,
+ currentElement.getTimestamp(),
+ currentWindow,
+ currentElement.getPane()));
+ }
+
+ @Override
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag);
+ if (consumers == null) {
+ throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+ }
+ outputTo(consumers,
+ WindowedValue.of(
+ output,
+ timestamp,
+ currentWindow,
+ currentElement.getPane()));
+ }
+
+ @Override
+ public InputT element() {
+ return currentElement.getValue();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new UnsupportedOperationException("TODO: Support side inputs");
+ }
+
+ @Override
+ public Instant timestamp() {
+ return currentElement.getTimestamp();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return currentElement.getPane();
+ }
+
+ @Override
+ public void updateWatermark(Instant watermark) {
+ throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn");
+ }
+ }
+
+ /**
+ * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.FinishBundle @FinishBundle}.
+ */
+ private class FinishBundleContext
+ extends DoFn<InputT, OutputT>.FinishBundleContext
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+ private FinishBundleContext() {
+ doFn.super();
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new UnsupportedOperationException(
+ "Cannot access window outside of @ProcessElement and @OnTimer methods.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access StartBundleContext outside of @StartBundle method.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+ DoFn<InputT, OutputT> doFn) {
+ return this;
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access ProcessContext outside of @ProcessElement method.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access OnTimerContext outside of @OnTimer methods.");
+ }
+
+ @Override
+ public RestrictionTracker<?> restrictionTracker() {
+ throw new UnsupportedOperationException(
+ "Cannot access RestrictionTracker outside of @ProcessElement method.");
+ }
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException(
+ "Cannot access state outside of @ProcessElement and @OnTimer methods.");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException(
+ "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
+ }
+
+ @Override
+ public void output(OutputT output, Instant timestamp, BoundedWindow window) {
+ outputTo(mainOutputConsumers,
+ WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
+ Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag);
+ if (consumers == null) {
+ throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+ }
+ outputTo(consumers,
+ WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
new file mode 100644
index 0000000..7cf0610
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A factory able to instantiate an appropriate handler for a given PTransform.
+ */
+public interface PTransformRunnerFactory<T> {
+
+ /**
+ * Creates and returns a handler for a given PTransform. Note that the handler must support
+ * processing multiple bundles. The handler will be discarded if an error is thrown during
+ * element processing, or during execution of start/finish.
+ *
+ * @param pipelineOptions Pipeline options
+ * @param beamFnDataClient
+ * @param pTransformId The id of the PTransform.
+ * @param pTransform The PTransform definition.
+ * @param processBundleInstructionId A supplier containing the active process bundle instruction
+ * id.
+ * @param pCollections A mapping from PCollection id to PCollection definition.
+ * @param coders A mapping from coder id to coder definition.
+ * @param pCollectionIdsToConsumers A mapping from PCollection id to a collection of consumers.
+ * Note that if this handler is a consumer, it should register itself within this multimap under
+ * the appropriate PCollection ids. Also note that all output consumers needed by this PTransform
+ * (based on the values of the {@link RunnerApi.PTransform#getOutputsMap()} will have already
+ * registered within this multimap.
+ * @param addStartFunction A consumer to register a start bundle handler with.
+ * @param addFinishFunction A consumer to register a finish bundle handler with.
+ */
+ T createRunnerForPTransform(
+ PipelineOptions pipelineOptions,
+ BeamFnDataClient beamFnDataClient,
+ String pTransformId,
+ RunnerApi.PTransform pTransform,
+ Supplier<String> processBundleInstructionId,
+ Map<String, RunnerApi.PCollection> pCollections,
+ Map<String, RunnerApi.Coder> coders,
+ Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+ Consumer<ThrowingRunnable> addStartFunction,
+ Consumer<ThrowingRunnable> addFinishFunction) throws IOException;
+
+ /**
+ * A registrar which can return a mapping from {@link RunnerApi.FunctionSpec#getUrn()} to
+ * a factory capable of instantiating an appropriate handler.
+ */
+ interface Registrar {
+ /**
+ * Returns a mapping from {@link RunnerApi.FunctionSpec#getUrn()} to a factory capable of
+ * instantiating an appropriate handler.
+ */
+ Map<String, PTransformRunnerFactory> getPTransformRunnerFactories();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 2a9cef8..1e73570 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -34,12 +34,12 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
+import org.apache.beam.fn.harness.PTransformRunnerFactory;
+import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.PTransformRunnerFactory;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
deleted file mode 100644
index 9339347..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static com.google.common.collect.Iterables.getOnlyElement;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Registers as a consumer for data over the Beam Fn API. Multiplexes any received data
- * to all consumers in the specified output map.
- *
- * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
- * For each request, call {@link #registerInputLocation()} to start and call
- * {@link #blockTillReadFinishes()} to finish.
- */
-public class BeamFnDataReadRunner<OutputT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final String URN = "urn:org.apache.beam:source:runner:0.1";
-
- /** A registrar which provides a factory to handle reading from the Fn Api Data Plane. */
- @AutoService(PTransformRunnerFactory.Registrar.class)
- public static class Registrar implements
- PTransformRunnerFactory.Registrar {
-
- @Override
- public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
- return ImmutableMap.of(URN, new Factory());
- }
- }
-
- /** A factory for {@link BeamFnDataReadRunner}s. */
- static class Factory<OutputT>
- implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
-
- @Override
- public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(
- PipelineOptions pipelineOptions,
- BeamFnDataClient beamFnDataClient,
- String pTransformId,
- RunnerApi.PTransform pTransform,
- Supplier<String> processBundleInstructionId,
- Map<String, RunnerApi.PCollection> pCollections,
- Map<String, RunnerApi.Coder> coders,
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
- Consumer<ThrowingRunnable> addStartFunction,
- Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
-
- BeamFnApi.Target target = BeamFnApi.Target.newBuilder()
- .setPrimitiveTransformReference(pTransformId)
- .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
- .build();
- RunnerApi.Coder coderSpec = coders.get(pCollections.get(
- getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
- Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers =
- (Collection) pCollectionIdsToConsumers.get(
- getOnlyElement(pTransform.getOutputsMap().values()));
-
- BeamFnDataReadRunner<OutputT> runner = new BeamFnDataReadRunner<>(
- pTransform.getSpec(),
- processBundleInstructionId,
- target,
- coderSpec,
- beamFnDataClient,
- consumers);
- addStartFunction.accept(runner::registerInputLocation);
- addFinishFunction.accept(runner::blockTillReadFinishes);
- return runner;
- }
- }
-
- private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
- private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
- private final Supplier<String> processBundleInstructionIdSupplier;
- private final BeamFnDataClient beamFnDataClientFactory;
- private final Coder<WindowedValue<OutputT>> coder;
- private final BeamFnApi.Target inputTarget;
-
- private CompletableFuture<Void> readFuture;
-
- BeamFnDataReadRunner(
- RunnerApi.FunctionSpec functionSpec,
- Supplier<String> processBundleInstructionIdSupplier,
- BeamFnApi.Target inputTarget,
- RunnerApi.Coder coderSpec,
- BeamFnDataClient beamFnDataClientFactory,
- Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers)
- throws IOException {
- this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
- .getApiServiceDescriptor();
- this.inputTarget = inputTarget;
- this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
- this.beamFnDataClientFactory = beamFnDataClientFactory;
- this.consumers = consumers;
-
- @SuppressWarnings("unchecked")
- Coder<WindowedValue<OutputT>> coder =
- (Coder<WindowedValue<OutputT>>)
- CloudObjects.coderFromCloudObject(
- CloudObject.fromSpec(
- OBJECT_MAPPER.readValue(
- coderSpec
- .getSpec()
- .getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
- .newInput(),
- Map.class)));
- this.coder = coder;
- }
-
- public void registerInputLocation() {
- this.readFuture = beamFnDataClientFactory.forInboundConsumer(
- apiServiceDescriptor,
- KV.of(processBundleInstructionIdSupplier.get(), inputTarget),
- coder,
- this::multiplexToConsumers);
- }
-
- public void blockTillReadFinishes() throws Exception {
- LOG.debug("Waiting for process bundle instruction {} and target {} to close.",
- processBundleInstructionIdSupplier.get(), inputTarget);
- readFuture.get();
- }
-
- private void multiplexToConsumers(WindowedValue<OutputT> value) throws Exception {
- for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
- consumer.accept(value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
deleted file mode 100644
index c2a996b..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static com.google.common.collect.Iterables.getOnlyElement;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
-import java.io.IOException;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Registers as a consumer with the Beam Fn Data Api. Consumes elements and encodes them for
- * transmission.
- *
- * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
- * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
- */
-public class BeamFnDataWriteRunner<InputT> {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final String URN = "urn:org.apache.beam:sink:runner:0.1";
-
- /** A registrar which provides a factory to handle writing to the Fn Api Data Plane. */
- @AutoService(PTransformRunnerFactory.Registrar.class)
- public static class Registrar implements
- PTransformRunnerFactory.Registrar {
-
- @Override
- public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
- return ImmutableMap.of(URN, new Factory());
- }
- }
-
- /** A factory for {@link BeamFnDataWriteRunner}s. */
- static class Factory<InputT>
- implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> {
-
- @Override
- public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(
- PipelineOptions pipelineOptions,
- BeamFnDataClient beamFnDataClient,
- String pTransformId,
- RunnerApi.PTransform pTransform,
- Supplier<String> processBundleInstructionId,
- Map<String, RunnerApi.PCollection> pCollections,
- Map<String, RunnerApi.Coder> coders,
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
- Consumer<ThrowingRunnable> addStartFunction,
- Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
- BeamFnApi.Target target = BeamFnApi.Target.newBuilder()
- .setPrimitiveTransformReference(pTransformId)
- .setName(getOnlyElement(pTransform.getInputsMap().keySet()))
- .build();
- RunnerApi.Coder coderSpec = coders.get(
- pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
- BeamFnDataWriteRunner<InputT> runner =
- new BeamFnDataWriteRunner<>(
- pTransform.getSpec(),
- processBundleInstructionId,
- target,
- coderSpec,
- beamFnDataClient);
- addStartFunction.accept(runner::registerForOutput);
- pCollectionIdsToConsumers.put(
- getOnlyElement(pTransform.getInputsMap().values()),
- (ThrowingConsumer)
- (ThrowingConsumer<WindowedValue<InputT>>) runner::consume);
- addFinishFunction.accept(runner::close);
- return runner;
- }
- }
-
- private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
- private final BeamFnApi.Target outputTarget;
- private final Coder<WindowedValue<InputT>> coder;
- private final BeamFnDataClient beamFnDataClientFactory;
- private final Supplier<String> processBundleInstructionIdSupplier;
-
- private CloseableThrowingConsumer<WindowedValue<InputT>> consumer;
-
- BeamFnDataWriteRunner(
- RunnerApi.FunctionSpec functionSpec,
- Supplier<String> processBundleInstructionIdSupplier,
- BeamFnApi.Target outputTarget,
- RunnerApi.Coder coderSpec,
- BeamFnDataClient beamFnDataClientFactory)
- throws IOException {
- this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
- .getApiServiceDescriptor();
- this.beamFnDataClientFactory = beamFnDataClientFactory;
- this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
- this.outputTarget = outputTarget;
-
- @SuppressWarnings("unchecked")
- Coder<WindowedValue<InputT>> coder =
- (Coder<WindowedValue<InputT>>)
- CloudObjects.coderFromCloudObject(
- CloudObject.fromSpec(
- OBJECT_MAPPER.readValue(
- coderSpec
- .getSpec()
- .getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
- .newInput(),
- Map.class)));
- this.coder = coder;
- }
-
- public void registerForOutput() {
- consumer = beamFnDataClientFactory.forOutboundConsumer(
- apiServiceDescriptor,
- KV.of(processBundleInstructionIdSupplier.get(), outputTarget),
- coder);
- }
-
- public void close() throws Exception {
- consumer.close();
- }
-
- public void consume(WindowedValue<InputT> value) throws Exception {
- consumer.accept(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
deleted file mode 100644
index 3338c3a..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source.Reader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * A runner which creates {@link Reader}s for each {@link BoundedSource} sent as an input and
- * executes the {@link Reader}s read loop.
- */
-public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
-
- private static final String URN = "urn:org.apache.beam:source:java:0.1";
-
- /** A registrar which provides a factory to handle Java {@link BoundedSource}s. */
- @AutoService(PTransformRunnerFactory.Registrar.class)
- public static class Registrar implements
- PTransformRunnerFactory.Registrar {
-
- @Override
- public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
- return ImmutableMap.of(URN, new Factory());
- }
- }
-
- /** A factory for {@link BoundedSourceRunner}. */
- static class Factory<InputT extends BoundedSource<OutputT>, OutputT>
- implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> {
- @Override
- public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(
- PipelineOptions pipelineOptions,
- BeamFnDataClient beamFnDataClient,
- String pTransformId,
- RunnerApi.PTransform pTransform,
- Supplier<String> processBundleInstructionId,
- Map<String, RunnerApi.PCollection> pCollections,
- Map<String, RunnerApi.Coder> coders,
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
- Consumer<ThrowingRunnable> addStartFunction,
- Consumer<ThrowingRunnable> addFinishFunction) {
-
- ImmutableList.Builder<ThrowingConsumer<WindowedValue<?>>> consumers = ImmutableList.builder();
- for (String pCollectionId : pTransform.getOutputsMap().values()) {
- consumers.addAll(pCollectionIdsToConsumers.get(pCollectionId));
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- BoundedSourceRunner<InputT, OutputT> runner = new BoundedSourceRunner(
- pipelineOptions,
- pTransform.getSpec(),
- consumers.build());
-
- // TODO: Remove and replace with source being sent across gRPC port
- addStartFunction.accept(runner::start);
-
- ThrowingConsumer runReadLoop =
- (ThrowingConsumer<WindowedValue<InputT>>) runner::runReadLoop;
- for (String pCollectionId : pTransform.getInputsMap().values()) {
- pCollectionIdsToConsumers.put(
- pCollectionId,
- runReadLoop);
- }
-
- return runner;
- }
- }
-
- private final PipelineOptions pipelineOptions;
- private final RunnerApi.FunctionSpec definition;
- private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
-
- BoundedSourceRunner(
- PipelineOptions pipelineOptions,
- RunnerApi.FunctionSpec definition,
- Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) {
- this.pipelineOptions = pipelineOptions;
- this.definition = definition;
- this.consumers = consumers;
- }
-
- /**
- * The runner harness is meant to send the source over the Beam Fn Data API which would be
- * consumed by the {@link #runReadLoop}. Drop this method once the runner harness sends the
- * source instead of unpacking it from the data block of the function specification.
- */
- @Deprecated
- public void start() throws Exception {
- try {
- // The representation here is defined as the java serialized representation of the
- // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper.
- byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray();
- @SuppressWarnings("unchecked")
- InputT boundedSource =
- (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
- runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(
- String.format("Failed to decode %s, expected %s",
- definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
- e);
- }
- }
-
- /**
- * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s
- * read loop. See {@link Reader} for further details of the read loop.
- *
- * <p>Propagates any exceptions caused during reading or processing via a consumer to the
- * caller.
- */
- public void runReadLoop(WindowedValue<InputT> value) throws Exception {
- try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) {
- if (!reader.start()) {
- // Reader has no data, immediately return
- return;
- }
- do {
- // TODO: Should this use the input window as the window for all the outputs?
- WindowedValue<OutputT> nextValue = WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp());
- for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
- consumer.accept(nextValue);
- }
- } while (reader.advance());
- }
- }
-
- @Override
- public String toString() {
- return definition.toString();
- }
-}