You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/18 20:05:36 UTC

[3/4] beam git commit: Fix split package in SDK harness

Fix split package in SDK harness

The Java SDK harness defined classes both in its own namespace
org.apache.beam.fn.harness and the org.apache.beam.runners.core namespace,
resulting in a split package across multiple jars.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b4700f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b4700f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b4700f

Branch: refs/heads/master
Commit: f1b4700f32c5ea39559145d6f5db3909439f6c80
Parents: 7e4719c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 17 13:46:46 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 17 13:46:46 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/BeamFnDataReadRunner.java   | 173 ++++++
 .../beam/fn/harness/BeamFnDataWriteRunner.java  | 159 ++++++
 .../beam/fn/harness/BoundedSourceRunner.java    | 167 ++++++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java | 548 +++++++++++++++++++
 .../fn/harness/PTransformRunnerFactory.java     |  81 +++
 .../harness/control/ProcessBundleHandler.java   |   4 +-
 .../beam/runners/core/BeamFnDataReadRunner.java | 173 ------
 .../runners/core/BeamFnDataWriteRunner.java     | 159 ------
 .../beam/runners/core/BoundedSourceRunner.java  | 167 ------
 .../beam/runners/core/FnApiDoFnRunner.java      | 547 ------------------
 .../runners/core/PTransformRunnerFactory.java   |  81 ---
 .../apache/beam/runners/core/package-info.java  |  22 -
 .../fn/harness/BeamFnDataReadRunnerTest.java    | 281 ++++++++++
 .../fn/harness/BeamFnDataWriteRunnerTest.java   | 269 +++++++++
 .../fn/harness/BoundedSourceRunnerTest.java     | 187 +++++++
 .../beam/fn/harness/FnApiDoFnRunnerTest.java    | 210 +++++++
 .../control/ProcessBundleHandlerTest.java       |   2 +-
 .../runners/core/BeamFnDataReadRunnerTest.java  | 281 ----------
 .../runners/core/BeamFnDataWriteRunnerTest.java | 269 ---------
 .../runners/core/BoundedSourceRunnerTest.java   | 187 -------
 .../beam/runners/core/FnApiDoFnRunnerTest.java  | 210 -------
 21 files changed, 2078 insertions(+), 2099 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
new file mode 100644
index 0000000..e2c17b0
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registers as a consumer for data over the Beam Fn API. Multiplexes any received data
+ * to all consumers in the specified output map.
+ *
+ * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
+ * For each request, call {@link #registerInputLocation()} to start and call
+ * {@link #blockTillReadFinishes()} to finish.
+ */
+public class BeamFnDataReadRunner<OutputT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final String URN = "urn:org.apache.beam:source:runner:0.1";
+
+  /** A registrar which provides a factory to handle reading from the Fn Api Data Plane. */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements
+      PTransformRunnerFactory.Registrar {
+
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+      return ImmutableMap.of(URN, new Factory());
+    }
+  }
+
+  /** A factory for {@link BeamFnDataReadRunner}s. */
+  static class Factory<OutputT>
+      implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
+
+    @Override
+    public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(
+        PipelineOptions pipelineOptions,
+        BeamFnDataClient beamFnDataClient,
+        String pTransformId,
+        RunnerApi.PTransform pTransform,
+        Supplier<String> processBundleInstructionId,
+        Map<String, RunnerApi.PCollection> pCollections,
+        Map<String, RunnerApi.Coder> coders,
+        Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+        Consumer<ThrowingRunnable> addStartFunction,
+        Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+
+      BeamFnApi.Target target = BeamFnApi.Target.newBuilder()
+          .setPrimitiveTransformReference(pTransformId)
+          .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
+          .build();
+      RunnerApi.Coder coderSpec = coders.get(pCollections.get(
+          getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+      Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers =
+          (Collection) pCollectionIdsToConsumers.get(
+              getOnlyElement(pTransform.getOutputsMap().values()));
+
+      BeamFnDataReadRunner<OutputT> runner = new BeamFnDataReadRunner<>(
+          pTransform.getSpec(),
+          processBundleInstructionId,
+          target,
+          coderSpec,
+          beamFnDataClient,
+          consumers);
+      addStartFunction.accept(runner::registerInputLocation);
+      addFinishFunction.accept(runner::blockTillReadFinishes);
+      return runner;
+    }
+  }
+
+  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
+  private final Supplier<String> processBundleInstructionIdSupplier;
+  private final BeamFnDataClient beamFnDataClientFactory;
+  private final Coder<WindowedValue<OutputT>> coder;
+  private final BeamFnApi.Target inputTarget;
+
+  private CompletableFuture<Void> readFuture;
+
+  BeamFnDataReadRunner(
+      RunnerApi.FunctionSpec functionSpec,
+      Supplier<String> processBundleInstructionIdSupplier,
+      BeamFnApi.Target inputTarget,
+      RunnerApi.Coder coderSpec,
+      BeamFnDataClient beamFnDataClientFactory,
+      Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers)
+          throws IOException {
+    this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
+        .getApiServiceDescriptor();
+    this.inputTarget = inputTarget;
+    this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
+    this.beamFnDataClientFactory = beamFnDataClientFactory;
+    this.consumers = consumers;
+
+    @SuppressWarnings("unchecked")
+    Coder<WindowedValue<OutputT>> coder =
+        (Coder<WindowedValue<OutputT>>)
+            CloudObjects.coderFromCloudObject(
+                CloudObject.fromSpec(
+                    OBJECT_MAPPER.readValue(
+                        coderSpec
+                            .getSpec()
+                            .getSpec()
+                            .getParameter()
+                            .unpack(BytesValue.class)
+                            .getValue()
+                            .newInput(),
+                        Map.class)));
+    this.coder = coder;
+  }
+
+  public void registerInputLocation() {
+    this.readFuture = beamFnDataClientFactory.forInboundConsumer(
+        apiServiceDescriptor,
+        KV.of(processBundleInstructionIdSupplier.get(), inputTarget),
+        coder,
+        this::multiplexToConsumers);
+  }
+
+  public void blockTillReadFinishes() throws Exception {
+    LOG.debug("Waiting for process bundle instruction {} and target {} to close.",
+        processBundleInstructionIdSupplier.get(), inputTarget);
+    readFuture.get();
+  }
+
+  private void multiplexToConsumers(WindowedValue<OutputT> value) throws Exception {
+    for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
+      consumer.accept(value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
new file mode 100644
index 0000000..eec4dfd
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Registers as a consumer with the Beam Fn Data Api. Consumes elements and encodes them for
+ * transmission.
+ *
+ * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
+ * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
+ */
+public class BeamFnDataWriteRunner<InputT> {
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final String URN = "urn:org.apache.beam:sink:runner:0.1";
+
+  /** A registrar which provides a factory to handle writing to the Fn Api Data Plane. */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements
+      PTransformRunnerFactory.Registrar {
+
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+      return ImmutableMap.of(URN, new Factory());
+    }
+  }
+
+  /** A factory for {@link BeamFnDataWriteRunner}s. */
+  static class Factory<InputT>
+      implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> {
+
+    @Override
+    public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(
+        PipelineOptions pipelineOptions,
+        BeamFnDataClient beamFnDataClient,
+        String pTransformId,
+        RunnerApi.PTransform pTransform,
+        Supplier<String> processBundleInstructionId,
+        Map<String, RunnerApi.PCollection> pCollections,
+        Map<String, RunnerApi.Coder> coders,
+        Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+        Consumer<ThrowingRunnable> addStartFunction,
+        Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+      BeamFnApi.Target target = BeamFnApi.Target.newBuilder()
+          .setPrimitiveTransformReference(pTransformId)
+          .setName(getOnlyElement(pTransform.getInputsMap().keySet()))
+          .build();
+      RunnerApi.Coder coderSpec = coders.get(
+          pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
+      BeamFnDataWriteRunner<InputT> runner =
+          new BeamFnDataWriteRunner<>(
+              pTransform.getSpec(),
+              processBundleInstructionId,
+              target,
+              coderSpec,
+              beamFnDataClient);
+      addStartFunction.accept(runner::registerForOutput);
+      pCollectionIdsToConsumers.put(
+          getOnlyElement(pTransform.getInputsMap().values()),
+          (ThrowingConsumer)
+              (ThrowingConsumer<WindowedValue<InputT>>) runner::consume);
+      addFinishFunction.accept(runner::close);
+      return runner;
+    }
+  }
+
+  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final BeamFnApi.Target outputTarget;
+  private final Coder<WindowedValue<InputT>> coder;
+  private final BeamFnDataClient beamFnDataClientFactory;
+  private final Supplier<String> processBundleInstructionIdSupplier;
+
+  private CloseableThrowingConsumer<WindowedValue<InputT>> consumer;
+
+  BeamFnDataWriteRunner(
+      RunnerApi.FunctionSpec functionSpec,
+      Supplier<String> processBundleInstructionIdSupplier,
+      BeamFnApi.Target outputTarget,
+      RunnerApi.Coder coderSpec,
+      BeamFnDataClient beamFnDataClientFactory)
+          throws IOException {
+    this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
+        .getApiServiceDescriptor();
+    this.beamFnDataClientFactory = beamFnDataClientFactory;
+    this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
+    this.outputTarget = outputTarget;
+
+    @SuppressWarnings("unchecked")
+    Coder<WindowedValue<InputT>> coder =
+        (Coder<WindowedValue<InputT>>)
+            CloudObjects.coderFromCloudObject(
+                CloudObject.fromSpec(
+                    OBJECT_MAPPER.readValue(
+                        coderSpec
+                            .getSpec()
+                            .getSpec()
+                            .getParameter()
+                            .unpack(BytesValue.class)
+                            .getValue()
+                            .newInput(),
+                        Map.class)));
+    this.coder = coder;
+  }
+
+  public void registerForOutput() {
+    consumer = beamFnDataClientFactory.forOutboundConsumer(
+        apiServiceDescriptor,
+        KV.of(processBundleInstructionIdSupplier.get(), outputTarget),
+        coder);
+  }
+
+  public void close() throws Exception {
+    consumer.close();
+  }
+
+  public void consume(WindowedValue<InputT> value) throws Exception {
+    consumer.accept(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
new file mode 100644
index 0000000..977e803
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A runner which creates {@link Reader}s for each {@link BoundedSource} sent as an input and
+ * executes the {@link Reader}s read loop.
+ */
+public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
+
+  private static final String URN = "urn:org.apache.beam:source:java:0.1";
+
+  /** A registrar which provides a factory to handle Java {@link BoundedSource}s. */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements
+      PTransformRunnerFactory.Registrar {
+
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+      return ImmutableMap.of(URN, new Factory());
+    }
+  }
+
+  /** A factory for {@link BoundedSourceRunner}. */
+  static class Factory<InputT extends BoundedSource<OutputT>, OutputT>
+      implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> {
+    @Override
+    public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(
+        PipelineOptions pipelineOptions,
+        BeamFnDataClient beamFnDataClient,
+        String pTransformId,
+        RunnerApi.PTransform pTransform,
+        Supplier<String> processBundleInstructionId,
+        Map<String, RunnerApi.PCollection> pCollections,
+        Map<String, RunnerApi.Coder> coders,
+        Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+        Consumer<ThrowingRunnable> addStartFunction,
+        Consumer<ThrowingRunnable> addFinishFunction) {
+
+      ImmutableList.Builder<ThrowingConsumer<WindowedValue<?>>> consumers = ImmutableList.builder();
+      for (String pCollectionId : pTransform.getOutputsMap().values()) {
+        consumers.addAll(pCollectionIdsToConsumers.get(pCollectionId));
+      }
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      BoundedSourceRunner<InputT, OutputT> runner = new BoundedSourceRunner(
+          pipelineOptions,
+          pTransform.getSpec(),
+          consumers.build());
+
+      // TODO: Remove and replace with source being sent across gRPC port
+      addStartFunction.accept(runner::start);
+
+      ThrowingConsumer runReadLoop =
+          (ThrowingConsumer<WindowedValue<InputT>>) runner::runReadLoop;
+      for (String pCollectionId : pTransform.getInputsMap().values()) {
+        pCollectionIdsToConsumers.put(
+            pCollectionId,
+            runReadLoop);
+      }
+
+      return runner;
+    }
+  }
+
+  private final PipelineOptions pipelineOptions;
+  private final RunnerApi.FunctionSpec definition;
+  private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
+
+  BoundedSourceRunner(
+      PipelineOptions pipelineOptions,
+      RunnerApi.FunctionSpec definition,
+      Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) {
+    this.pipelineOptions = pipelineOptions;
+    this.definition = definition;
+    this.consumers = consumers;
+  }
+
+  /**
+   * The runner harness is meant to send the source over the Beam Fn Data API which would be
+   * consumed by the {@link #runReadLoop}. Drop this method once the runner harness sends the
+   * source instead of unpacking it from the data block of the function specification.
+   */
+  @Deprecated
+  public void start() throws Exception {
+    try {
+      // The representation here is defined as the java serialized representation of the
+      // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper.
+      byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray();
+      @SuppressWarnings("unchecked")
+      InputT boundedSource =
+          (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
+      runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
+    } catch (InvalidProtocolBufferException e) {
+      throw new IOException(
+          String.format("Failed to decode %s, expected %s",
+              definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
+          e);
+    }
+  }
+
+  /**
+   * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s
+   * read loop. See {@link Reader} for further details of the read loop.
+   *
+   * <p>Propagates any exceptions caused during reading or processing via a consumer to the
+   * caller.
+   */
+  public void runReadLoop(WindowedValue<InputT> value) throws Exception {
+    try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) {
+      if (!reader.start()) {
+        // Reader has no data, immediately return
+        return;
+      }
+      do {
+        // TODO: Should this use the input window as the window for all the outputs?
+        WindowedValue<OutputT> nextValue = WindowedValue.timestampedValueInGlobalWindow(
+            reader.getCurrent(), reader.getCurrentTimestamp());
+        for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
+          consumer.accept(nextValue);
+        }
+      } while (reader.advance());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return definition.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
new file mode 100644
index 0000000..97bd71c
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
+
+/**
+ * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers
+ * of abstraction caused by StateInternals/TimerInternals since they model state and timer
+ * concepts differently.
+ */
+public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+  /**
+   * A registrar which provides a factory to handle Java {@link DoFn}s.
+   */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements
+      PTransformRunnerFactory.Registrar {
+
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+      return ImmutableMap.of(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN, new Factory());
+    }
+  }
+
+  /** A factory for {@link FnApiDoFnRunner}. */
+  static class Factory<InputT, OutputT>
+      implements PTransformRunnerFactory<DoFnRunner<InputT, OutputT>> {
+
+    @Override
+    public DoFnRunner<InputT, OutputT> createRunnerForPTransform(
+        PipelineOptions pipelineOptions,
+        BeamFnDataClient beamFnDataClient,
+        String pTransformId,
+        RunnerApi.PTransform pTransform,
+        Supplier<String> processBundleInstructionId,
+        Map<String, RunnerApi.PCollection> pCollections,
+        Map<String, RunnerApi.Coder> coders,
+        Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+        Consumer<ThrowingRunnable> addStartFunction,
+        Consumer<ThrowingRunnable> addFinishFunction) {
+
+      // For every output PCollection, create a map from output name to Consumer
+      ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<?>>>>
+          outputMapBuilder = ImmutableMap.builder();
+      for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
+        outputMapBuilder.put(
+            entry.getKey(),
+            pCollectionIdsToConsumers.get(entry.getValue()));
+      }
+      ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<?>>>> outputMap =
+          outputMapBuilder.build();
+
+      // Get the DoFnInfo from the serialized blob.
+      ByteString serializedFn;
+      try {
+        serializedFn = pTransform.getSpec().getParameter().unpack(BytesValue.class).getValue();
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalArgumentException(
+            String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e);
+      }
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray(
+          serializedFn.toByteArray(), "DoFnInfo");
+
+      // Verify that the DoFnInfo tag to output map matches the output map on the PTransform.
+      checkArgument(
+          Objects.equals(
+              new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)),
+              doFnInfo.getOutputMap().keySet()),
+          "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.",
+          outputMap.keySet(),
+          doFnInfo.getOutputMap());
+
+      ImmutableMultimap.Builder<TupleTag<?>,
+          ThrowingConsumer<WindowedValue<?>>> tagToOutputMapBuilder =
+          ImmutableMultimap.builder();
+      for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) {
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Collection<ThrowingConsumer<WindowedValue<?>>> consumers =
+            outputMap.get(Long.toString(entry.getKey()));
+        tagToOutputMapBuilder.putAll(entry.getValue(), consumers);
+      }
+
+      ImmutableMultimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> tagToOutputMap =
+          tagToOutputMapBuilder.build();
+
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      DoFnRunner<InputT, OutputT> runner = new FnApiDoFnRunner<>(
+          pipelineOptions,
+          doFnInfo.getDoFn(),
+          (Collection<ThrowingConsumer<WindowedValue<OutputT>>>) (Collection)
+              tagToOutputMap.get(doFnInfo.getOutputMap().get(doFnInfo.getMainOutput())),
+          tagToOutputMap,
+          doFnInfo.getWindowingStrategy());
+
+      // Register the appropriate handlers.
+      addStartFunction.accept(runner::startBundle);
+      for (String pcollectionId : pTransform.getInputsMap().values()) {
+        pCollectionIdsToConsumers.put(
+            pcollectionId,
+            (ThrowingConsumer) (ThrowingConsumer<WindowedValue<InputT>>) runner::processElement);
+      }
+      addFinishFunction.accept(runner::finishBundle);
+      return runner;
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private final PipelineOptions pipelineOptions;
+  private final DoFn<InputT, OutputT> doFn;
+  private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers;
+  private final Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap;
+  private final DoFnInvoker<InputT, OutputT> doFnInvoker;
+  private final StartBundleContext startBundleContext;
+  private final ProcessBundleContext processBundleContext;
+  private final FinishBundleContext finishBundleContext;
+
+  /**
+   * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}.
+   */
+  private WindowedValue<InputT> currentElement;
+
+  /**
+   * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}.
+   */
+  private BoundedWindow currentWindow;
+
+  FnApiDoFnRunner(
+      PipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers,
+      Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap,
+      WindowingStrategy windowingStrategy) {
+    this.pipelineOptions = pipelineOptions;
+    this.doFn = doFn;
+    this.mainOutputConsumers = mainOutputConsumers;
+    this.outputMap = outputMap;
+    this.doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    this.startBundleContext = new StartBundleContext();
+    this.processBundleContext = new ProcessBundleContext();
+    this.finishBundleContext = new FinishBundleContext();
+  }
+
+  @Override
+  public void startBundle() {
+    doFnInvoker.invokeStartBundle(startBundleContext);
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    currentElement = elem;
+    try {
+      Iterator<BoundedWindow> windowIterator =
+          (Iterator<BoundedWindow>) elem.getWindows().iterator();
+      while (windowIterator.hasNext()) {
+        currentWindow = windowIterator.next();
+        doFnInvoker.invokeProcessElement(processBundleContext);
+      }
+    } finally {
+      currentElement = null;
+      currentWindow = null;
+    }
+  }
+
+  @Override
+  public void onTimer(
+      String timerId,
+      BoundedWindow window,
+      Instant timestamp,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("TODO: Add support for timers");
+  }
+
+  @Override
+  public void finishBundle() {
+    doFnInvoker.invokeFinishBundle(finishBundleContext);
+  }
+
+  /**
+   * Outputs the given element to the specified set of consumers wrapping any exceptions.
+   */
+  private <T> void outputTo(
+      Collection<ThrowingConsumer<WindowedValue<T>>> consumers,
+      WindowedValue<T> output) {
+    Iterator<ThrowingConsumer<WindowedValue<T>>> consumerIterator;
+    try {
+      for (ThrowingConsumer<WindowedValue<T>> consumer : consumers) {
+        consumer.accept(output);
+      }
+    } catch (Throwable t) {
+      throw UserCodeException.wrap(t);
+    }
+  }
+
+  /**
+   * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.StartBundle @StartBundle}.
+   */
+  private class StartBundleContext
+      extends DoFn<InputT, OutputT>.StartBundleContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private StartBundleContext() {
+      doFn.super();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "Cannot access window outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access FinishBundleContext outside of @FinishBundle method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access ProcessContext outside of @ProcessElement method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnTimerContext outside of @OnTimer methods.");
+    }
+
+    @Override
+    public RestrictionTracker<?> restrictionTracker() {
+      throw new UnsupportedOperationException(
+          "Cannot access RestrictionTracker outside of @ProcessElement method.");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException(
+          "Cannot access state outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException(
+          "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
+    }
+  }
+
+  /**
+   * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.ProcessElement @ProcessElement}.
+   */
+  private class ProcessBundleContext
+      extends DoFn<InputT, OutputT>.ProcessContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private ProcessBundleContext() {
+      doFn.super();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return currentWindow;
+    }
+
+    @Override
+    public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access StartBundleContext outside of @StartBundle method.");
+    }
+
+    @Override
+    public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access FinishBundleContext outside of @FinishBundle method.");
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("TODO: Add support for timers");
+    }
+
+    @Override
+    public RestrictionTracker<?> restrictionTracker() {
+      throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("TODO: Add support for state");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("TODO: Add support for timers");
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public void output(OutputT output) {
+      outputTo(mainOutputConsumers,
+          WindowedValue.of(
+              output,
+              currentElement.getTimestamp(),
+              currentWindow,
+              currentElement.getPane()));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      outputTo(mainOutputConsumers,
+          WindowedValue.of(
+              output,
+              timestamp,
+              currentWindow,
+              currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag);
+      if (consumers == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      outputTo(consumers,
+          WindowedValue.of(
+              output,
+              currentElement.getTimestamp(),
+              currentWindow,
+              currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag);
+      if (consumers == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      outputTo(consumers,
+          WindowedValue.of(
+              output,
+              timestamp,
+              currentWindow,
+              currentElement.getPane()));
+    }
+
+    @Override
+    public InputT element() {
+      return currentElement.getValue();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new UnsupportedOperationException("TODO: Support side inputs");
+    }
+
+    @Override
+    public Instant timestamp() {
+      return currentElement.getTimestamp();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return currentElement.getPane();
+    }
+
+    @Override
+    public void updateWatermark(Instant watermark) {
+      throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn");
+    }
+  }
+
+  /**
+   * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.FinishBundle @FinishBundle}.
+   */
+  private class FinishBundleContext
+      extends DoFn<InputT, OutputT>.FinishBundleContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private FinishBundleContext() {
+      doFn.super();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "Cannot access window outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access StartBundleContext outside of @StartBundle method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access ProcessContext outside of @ProcessElement method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnTimerContext outside of @OnTimer methods.");
+    }
+
+    @Override
+    public RestrictionTracker<?> restrictionTracker() {
+      throw new UnsupportedOperationException(
+          "Cannot access RestrictionTracker outside of @ProcessElement method.");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException(
+          "Cannot access state outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException(
+          "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public void output(OutputT output, Instant timestamp, BoundedWindow window) {
+      outputTo(mainOutputConsumers,
+          WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
+      Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag);
+      if (consumers == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      outputTo(consumers,
+          WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
new file mode 100644
index 0000000..7cf0610
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A factory able to instantiate an appropriate handler for a given PTransform.
+ */
+public interface PTransformRunnerFactory<T> {
+
+  /**
+   * Creates and returns a handler for a given PTransform. Note that the handler must support
+   * processing multiple bundles. The handler will be discarded if an error is thrown during
+   * element processing, or during execution of start/finish.
+   *
+   * @param pipelineOptions Pipeline options
+   * @param beamFnDataClient
+   * @param pTransformId The id of the PTransform.
+   * @param pTransform The PTransform definition.
+   * @param processBundleInstructionId A supplier containing the active process bundle instruction
+   * id.
+   * @param pCollections A mapping from PCollection id to PCollection definition.
+   * @param coders A mapping from coder id to coder definition.
+   * @param pCollectionIdsToConsumers A mapping from PCollection id to a collection of consumers.
+   * Note that if this handler is a consumer, it should register itself within this multimap under
+   * the appropriate PCollection ids. Also note that all output consumers needed by this PTransform
+   * (based on the values of the {@link RunnerApi.PTransform#getOutputsMap()} will have already
+   * registered within this multimap.
+   * @param addStartFunction A consumer to register a start bundle handler with.
+   * @param addFinishFunction A consumer to register a finish bundle handler with.
+   */
+  T createRunnerForPTransform(
+      PipelineOptions pipelineOptions,
+      BeamFnDataClient beamFnDataClient,
+      String pTransformId,
+      RunnerApi.PTransform pTransform,
+      Supplier<String> processBundleInstructionId,
+      Map<String, RunnerApi.PCollection> pCollections,
+      Map<String, RunnerApi.Coder> coders,
+      Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+      Consumer<ThrowingRunnable> addStartFunction,
+      Consumer<ThrowingRunnable> addFinishFunction) throws IOException;
+
+  /**
+   * A registrar which can return a mapping from {@link RunnerApi.FunctionSpec#getUrn()} to
+   * a factory capable of instantiating an appropriate handler.
+   */
+  interface Registrar {
+    /**
+     * Returns a mapping from {@link RunnerApi.FunctionSpec#getUrn()} to a factory capable of
+     * instantiating an appropriate handler.
+     */
+    Map<String, PTransformRunnerFactory> getPTransformRunnerFactories();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 2a9cef8..1e73570 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -34,12 +34,12 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import org.apache.beam.fn.harness.PTransformRunnerFactory;
+import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.PTransformRunnerFactory;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
deleted file mode 100644
index 9339347..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static com.google.common.collect.Iterables.getOnlyElement;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Registers as a consumer for data over the Beam Fn API. Multiplexes any received data
- * to all consumers in the specified output map.
- *
- * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
- * For each request, call {@link #registerInputLocation()} to start and call
- * {@link #blockTillReadFinishes()} to finish.
- */
-public class BeamFnDataReadRunner<OutputT> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-  private static final String URN = "urn:org.apache.beam:source:runner:0.1";
-
-  /** A registrar which provides a factory to handle reading from the Fn Api Data Plane. */
-  @AutoService(PTransformRunnerFactory.Registrar.class)
-  public static class Registrar implements
-      PTransformRunnerFactory.Registrar {
-
-    @Override
-    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
-      return ImmutableMap.of(URN, new Factory());
-    }
-  }
-
-  /** A factory for {@link BeamFnDataReadRunner}s. */
-  static class Factory<OutputT>
-      implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
-
-    @Override
-    public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(
-        PipelineOptions pipelineOptions,
-        BeamFnDataClient beamFnDataClient,
-        String pTransformId,
-        RunnerApi.PTransform pTransform,
-        Supplier<String> processBundleInstructionId,
-        Map<String, RunnerApi.PCollection> pCollections,
-        Map<String, RunnerApi.Coder> coders,
-        Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
-        Consumer<ThrowingRunnable> addStartFunction,
-        Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
-
-      BeamFnApi.Target target = BeamFnApi.Target.newBuilder()
-          .setPrimitiveTransformReference(pTransformId)
-          .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
-          .build();
-      RunnerApi.Coder coderSpec = coders.get(pCollections.get(
-          getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
-      Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers =
-          (Collection) pCollectionIdsToConsumers.get(
-              getOnlyElement(pTransform.getOutputsMap().values()));
-
-      BeamFnDataReadRunner<OutputT> runner = new BeamFnDataReadRunner<>(
-          pTransform.getSpec(),
-          processBundleInstructionId,
-          target,
-          coderSpec,
-          beamFnDataClient,
-          consumers);
-      addStartFunction.accept(runner::registerInputLocation);
-      addFinishFunction.accept(runner::blockTillReadFinishes);
-      return runner;
-    }
-  }
-
-  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
-  private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
-  private final Supplier<String> processBundleInstructionIdSupplier;
-  private final BeamFnDataClient beamFnDataClientFactory;
-  private final Coder<WindowedValue<OutputT>> coder;
-  private final BeamFnApi.Target inputTarget;
-
-  private CompletableFuture<Void> readFuture;
-
-  BeamFnDataReadRunner(
-      RunnerApi.FunctionSpec functionSpec,
-      Supplier<String> processBundleInstructionIdSupplier,
-      BeamFnApi.Target inputTarget,
-      RunnerApi.Coder coderSpec,
-      BeamFnDataClient beamFnDataClientFactory,
-      Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers)
-          throws IOException {
-    this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
-        .getApiServiceDescriptor();
-    this.inputTarget = inputTarget;
-    this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
-    this.beamFnDataClientFactory = beamFnDataClientFactory;
-    this.consumers = consumers;
-
-    @SuppressWarnings("unchecked")
-    Coder<WindowedValue<OutputT>> coder =
-        (Coder<WindowedValue<OutputT>>)
-            CloudObjects.coderFromCloudObject(
-                CloudObject.fromSpec(
-                    OBJECT_MAPPER.readValue(
-                        coderSpec
-                            .getSpec()
-                            .getSpec()
-                            .getParameter()
-                            .unpack(BytesValue.class)
-                            .getValue()
-                            .newInput(),
-                        Map.class)));
-    this.coder = coder;
-  }
-
-  public void registerInputLocation() {
-    this.readFuture = beamFnDataClientFactory.forInboundConsumer(
-        apiServiceDescriptor,
-        KV.of(processBundleInstructionIdSupplier.get(), inputTarget),
-        coder,
-        this::multiplexToConsumers);
-  }
-
-  public void blockTillReadFinishes() throws Exception {
-    LOG.debug("Waiting for process bundle instruction {} and target {} to close.",
-        processBundleInstructionIdSupplier.get(), inputTarget);
-    readFuture.get();
-  }
-
-  private void multiplexToConsumers(WindowedValue<OutputT> value) throws Exception {
-    for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
-      consumer.accept(value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
deleted file mode 100644
index c2a996b..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static com.google.common.collect.Iterables.getOnlyElement;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
-import java.io.IOException;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * Registers as a consumer with the Beam Fn Data Api. Consumes elements and encodes them for
- * transmission.
- *
- * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s.
- * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish.
- */
-public class BeamFnDataWriteRunner<InputT> {
-
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-  private static final String URN = "urn:org.apache.beam:sink:runner:0.1";
-
-  /** A registrar which provides a factory to handle writing to the Fn Api Data Plane. */
-  @AutoService(PTransformRunnerFactory.Registrar.class)
-  public static class Registrar implements
-      PTransformRunnerFactory.Registrar {
-
-    @Override
-    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
-      return ImmutableMap.of(URN, new Factory());
-    }
-  }
-
-  /** A factory for {@link BeamFnDataWriteRunner}s. */
-  static class Factory<InputT>
-      implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> {
-
-    @Override
-    public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(
-        PipelineOptions pipelineOptions,
-        BeamFnDataClient beamFnDataClient,
-        String pTransformId,
-        RunnerApi.PTransform pTransform,
-        Supplier<String> processBundleInstructionId,
-        Map<String, RunnerApi.PCollection> pCollections,
-        Map<String, RunnerApi.Coder> coders,
-        Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
-        Consumer<ThrowingRunnable> addStartFunction,
-        Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
-      BeamFnApi.Target target = BeamFnApi.Target.newBuilder()
-          .setPrimitiveTransformReference(pTransformId)
-          .setName(getOnlyElement(pTransform.getInputsMap().keySet()))
-          .build();
-      RunnerApi.Coder coderSpec = coders.get(
-          pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
-      BeamFnDataWriteRunner<InputT> runner =
-          new BeamFnDataWriteRunner<>(
-              pTransform.getSpec(),
-              processBundleInstructionId,
-              target,
-              coderSpec,
-              beamFnDataClient);
-      addStartFunction.accept(runner::registerForOutput);
-      pCollectionIdsToConsumers.put(
-          getOnlyElement(pTransform.getInputsMap().values()),
-          (ThrowingConsumer)
-              (ThrowingConsumer<WindowedValue<InputT>>) runner::consume);
-      addFinishFunction.accept(runner::close);
-      return runner;
-    }
-  }
-
-  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
-  private final BeamFnApi.Target outputTarget;
-  private final Coder<WindowedValue<InputT>> coder;
-  private final BeamFnDataClient beamFnDataClientFactory;
-  private final Supplier<String> processBundleInstructionIdSupplier;
-
-  private CloseableThrowingConsumer<WindowedValue<InputT>> consumer;
-
-  BeamFnDataWriteRunner(
-      RunnerApi.FunctionSpec functionSpec,
-      Supplier<String> processBundleInstructionIdSupplier,
-      BeamFnApi.Target outputTarget,
-      RunnerApi.Coder coderSpec,
-      BeamFnDataClient beamFnDataClientFactory)
-          throws IOException {
-    this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
-        .getApiServiceDescriptor();
-    this.beamFnDataClientFactory = beamFnDataClientFactory;
-    this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
-    this.outputTarget = outputTarget;
-
-    @SuppressWarnings("unchecked")
-    Coder<WindowedValue<InputT>> coder =
-        (Coder<WindowedValue<InputT>>)
-            CloudObjects.coderFromCloudObject(
-                CloudObject.fromSpec(
-                    OBJECT_MAPPER.readValue(
-                        coderSpec
-                            .getSpec()
-                            .getSpec()
-                            .getParameter()
-                            .unpack(BytesValue.class)
-                            .getValue()
-                            .newInput(),
-                        Map.class)));
-    this.coder = coder;
-  }
-
-  public void registerForOutput() {
-    consumer = beamFnDataClientFactory.forOutboundConsumer(
-        apiServiceDescriptor,
-        KV.of(processBundleInstructionIdSupplier.get(), outputTarget),
-        coder);
-  }
-
-  public void close() throws Exception {
-    consumer.close();
-  }
-
-  public void consume(WindowedValue<InputT> value) throws Exception {
-    consumer.accept(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
deleted file mode 100644
index 3338c3a..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source.Reader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * A runner which creates {@link Reader}s for each {@link BoundedSource} sent as an input and
- * executes the {@link Reader}s read loop.
- */
-public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
-
-  private static final String URN = "urn:org.apache.beam:source:java:0.1";
-
-  /** A registrar which provides a factory to handle Java {@link BoundedSource}s. */
-  @AutoService(PTransformRunnerFactory.Registrar.class)
-  public static class Registrar implements
-      PTransformRunnerFactory.Registrar {
-
-    @Override
-    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
-      return ImmutableMap.of(URN, new Factory());
-    }
-  }
-
-  /** A factory for {@link BoundedSourceRunner}. */
-  static class Factory<InputT extends BoundedSource<OutputT>, OutputT>
-      implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> {
-    @Override
-    public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(
-        PipelineOptions pipelineOptions,
-        BeamFnDataClient beamFnDataClient,
-        String pTransformId,
-        RunnerApi.PTransform pTransform,
-        Supplier<String> processBundleInstructionId,
-        Map<String, RunnerApi.PCollection> pCollections,
-        Map<String, RunnerApi.Coder> coders,
-        Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
-        Consumer<ThrowingRunnable> addStartFunction,
-        Consumer<ThrowingRunnable> addFinishFunction) {
-
-      ImmutableList.Builder<ThrowingConsumer<WindowedValue<?>>> consumers = ImmutableList.builder();
-      for (String pCollectionId : pTransform.getOutputsMap().values()) {
-        consumers.addAll(pCollectionIdsToConsumers.get(pCollectionId));
-      }
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      BoundedSourceRunner<InputT, OutputT> runner = new BoundedSourceRunner(
-          pipelineOptions,
-          pTransform.getSpec(),
-          consumers.build());
-
-      // TODO: Remove and replace with source being sent across gRPC port
-      addStartFunction.accept(runner::start);
-
-      ThrowingConsumer runReadLoop =
-          (ThrowingConsumer<WindowedValue<InputT>>) runner::runReadLoop;
-      for (String pCollectionId : pTransform.getInputsMap().values()) {
-        pCollectionIdsToConsumers.put(
-            pCollectionId,
-            runReadLoop);
-      }
-
-      return runner;
-    }
-  }
-
-  private final PipelineOptions pipelineOptions;
-  private final RunnerApi.FunctionSpec definition;
-  private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
-
-  BoundedSourceRunner(
-      PipelineOptions pipelineOptions,
-      RunnerApi.FunctionSpec definition,
-      Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) {
-    this.pipelineOptions = pipelineOptions;
-    this.definition = definition;
-    this.consumers = consumers;
-  }
-
-  /**
-   * The runner harness is meant to send the source over the Beam Fn Data API which would be
-   * consumed by the {@link #runReadLoop}. Drop this method once the runner harness sends the
-   * source instead of unpacking it from the data block of the function specification.
-   */
-  @Deprecated
-  public void start() throws Exception {
-    try {
-      // The representation here is defined as the java serialized representation of the
-      // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper.
-      byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray();
-      @SuppressWarnings("unchecked")
-      InputT boundedSource =
-          (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
-      runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
-    } catch (InvalidProtocolBufferException e) {
-      throw new IOException(
-          String.format("Failed to decode %s, expected %s",
-              definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
-          e);
-    }
-  }
-
-  /**
-   * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s
-   * read loop. See {@link Reader} for further details of the read loop.
-   *
-   * <p>Propagates any exceptions caused during reading or processing via a consumer to the
-   * caller.
-   */
-  public void runReadLoop(WindowedValue<InputT> value) throws Exception {
-    try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) {
-      if (!reader.start()) {
-        // Reader has no data, immediately return
-        return;
-      }
-      do {
-        // TODO: Should this use the input window as the window for all the outputs?
-        WindowedValue<OutputT> nextValue = WindowedValue.timestampedValueInGlobalWindow(
-            reader.getCurrent(), reader.getCurrentTimestamp());
-        for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) {
-          consumer.accept(nextValue);
-        }
-      } while (reader.advance());
-    }
-  }
-
-  @Override
-  public String toString() {
-    return definition.toString();
-  }
-}