You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/30 20:48:55 UTC

[4/6] beam git commit: A proposal for a portability framework to execute user definable functions.

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
new file mode 100644
index 0000000..14e26f0
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.control;
+
+import com.google.protobuf.Message;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.RegisterResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A handler and datastore for types that be can be registered via the Fn API.
+ *
+ * <p>Allows for {@link org.apache.beam.fn.v1.BeamFnApi.RegisterRequest}s to occur in parallel with
+ * subsequent requests that may lookup registered values by blocking lookups until registration
+ * occurs.
+ */
+public class RegisterHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RegisterHandler.class);
+  private final ConcurrentMap<Long, CompletableFuture<Message>> idToObject;
+
+  public RegisterHandler() {
+    idToObject = new ConcurrentHashMap<>();
+  }
+
+  public <T extends Message> T getById(long id) {
+    try {
+      @SuppressWarnings("unchecked")
+      CompletableFuture<T> returnValue = (CompletableFuture<T>) computeIfAbsent(id);
+      /*
+       * TODO: Even though the register request instruction occurs before the process bundle
+       * instruction in the control stream, the instructions are being processed in parallel
+       * in the Java harness causing a data race which is why we use a future. This will block
+       * forever in the case of a runner having a bug sending the wrong ids. Instead of blocking
+       * forever, we could do a timed wait or come up with another way of ordering the instruction
+       * processing to remove the data race.
+       */
+      return returnValue.get();
+    } catch (ExecutionException e) {
+      throw new RuntimeException(String.format("Failed to load %s", id), e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(String.format("Failed to load %s", id), e);
+    }
+  }
+
+  public BeamFnApi.InstructionResponse.Builder register(BeamFnApi.InstructionRequest request) {
+    BeamFnApi.InstructionResponse.Builder response = BeamFnApi.InstructionResponse.newBuilder()
+        .setRegister(RegisterResponse.getDefaultInstance());
+
+    BeamFnApi.RegisterRequest registerRequest = request.getRegister();
+    for (BeamFnApi.ProcessBundleDescriptor processBundleDescriptor
+        : registerRequest.getProcessBundleDescriptorList()) {
+      LOGGER.debug("Registering {} with type {}",
+          processBundleDescriptor.getId(),
+          processBundleDescriptor.getClass());
+      computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor);
+      for (BeamFnApi.Coder coder : processBundleDescriptor.getCodersList()) {
+        LOGGER.debug("Registering {} with type {}",
+            coder.getFunctionSpec().getId(),
+            coder.getClass());
+        computeIfAbsent(coder.getFunctionSpec().getId()).complete(coder);
+      }
+    }
+
+    return response;
+  }
+
+  private CompletableFuture<Message> computeIfAbsent(long id) {
+    return idToObject.computeIfAbsent(id, (Long ignored) -> new CompletableFuture<>());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java
new file mode 100644
index 0000000..6535555
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Control service client and individual request handlers.
+ */
+package org.apache.beam.fn.harness.control;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
new file mode 100644
index 0000000..3bf44ab
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A buffering outbound {@link Consumer} for the Beam Fn Data API.
+ *
+ * <p>Encodes individually consumed elements with the provided {@link Coder} producing
+ * a single {@link org.apache.beam.fn.v1.BeamFnApi.Elements} message when the buffer threshold
+ * is surpassed.
+ *
+ * <p>The default buffer threshold can be overridden by specifying the experiment
+ * {@code beam_fn_api_data_buffer_limit=<bytes>}
+ *
+ * <p>TODO: Handle outputting large elements (&gt; 2GiBs). Note that this also applies to the
+ * input side as well.
+ *
+ * <p>TODO: Handle outputting elements that are zero bytes by outputting a single byte as
+ * a marker, detect on the input side that no bytes were read and force reading a single byte.
+ */
+public class BeamFnDataBufferingOutboundObserver<T>
+    implements CloseableThrowingConsumer<WindowedValue<T>> {
+  private static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
+  private static final int DEFAULT_BUFFER_LIMIT_BYTES = 1_000_000;
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(BeamFnDataBufferingOutboundObserver.class);
+
+  private long byteCounter;
+  private long counter;
+  private final int bufferLimit;
+  private final Coder<WindowedValue<T>> coder;
+  private final KV<Long, BeamFnApi.Target> outputLocation;
+  private final StreamObserver<BeamFnApi.Elements> outboundObserver;
+  private final ByteString.Output bufferedElements;
+
+  public BeamFnDataBufferingOutboundObserver(
+      PipelineOptions options,
+      KV<Long, BeamFnApi.Target> outputLocation,
+      Coder<WindowedValue<T>> coder,
+      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+    this.bufferLimit = getBufferLimit(options);
+    this.outputLocation = outputLocation;
+    this.coder = coder;
+    this.outboundObserver = outboundObserver;
+    this.bufferedElements = ByteString.newOutput();
+  }
+
+  /**
+   * Returns the {@code beam_fn_api_data_buffer_limit=<int>} experiment value if set. Otherwise
+   * returns the default buffer limit.
+   */
+  private static int getBufferLimit(PipelineOptions options) {
+    List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+    for (String experiment : experiments == null ? Collections.<String>emptyList() : experiments) {
+      if (experiment.startsWith(BEAM_FN_API_DATA_BUFFER_LIMIT)) {
+        return Integer.parseInt(experiment.substring(BEAM_FN_API_DATA_BUFFER_LIMIT.length()));
+      }
+    }
+    return DEFAULT_BUFFER_LIMIT_BYTES;
+  }
+
+  @Override
+  public void close() throws Exception {
+    BeamFnApi.Elements.Builder elements = convertBufferForTransmission();
+    // This will add an empty data block representing the end of stream.
+    elements.addDataBuilder()
+        .setInstructionReference(outputLocation.getKey())
+        .setTarget(outputLocation.getValue());
+
+    LOGGER.debug("Closing stream for instruction {} and "
+        + "target {} having transmitted {} values {} bytes",
+        outputLocation.getKey(),
+        outputLocation.getValue(),
+        counter,
+        byteCounter);
+    outboundObserver.onNext(elements.build());
+  }
+
+  @Override
+  public void accept(WindowedValue<T> t) throws IOException {
+    coder.encode(t, bufferedElements, Context.NESTED);
+    counter += 1;
+    if (bufferedElements.size() >= bufferLimit) {
+      outboundObserver.onNext(convertBufferForTransmission().build());
+    }
+  }
+
+  private BeamFnApi.Elements.Builder convertBufferForTransmission() {
+    BeamFnApi.Elements.Builder elements = BeamFnApi.Elements.newBuilder();
+    if (bufferedElements.size() == 0) {
+      return elements;
+    }
+
+    elements.addDataBuilder()
+        .setInstructionReference(outputLocation.getKey())
+        .setTarget(outputLocation.getValue())
+        .setData(bufferedElements.toByteString());
+
+    byteCounter += bufferedElements.size();
+    bufferedElements.reset();
+    return elements;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
new file mode 100644
index 0000000..27b1acb
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.data;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * The {@link BeamFnDataClient} is able to forward inbound elements to a consumer and is also a
+ * consumer of outbound elements. Callers can register themselves as consumers for inbound elements
+ * or can get a handle for a consumer for outbound elements.
+ */
+public interface BeamFnDataClient {
+  /**
+   * Registers the following inbound consumer for the provided instruction id and target.
+   *
+   * <p>The provided coder is used to decode inbound elements. The decoded elements
+   * are passed to the provided consumer. Any failure during decoding or processing of the element
+   * will complete the returned future exceptionally. On successful termination of the stream,
+   * the returned future is completed successfully.
+   *
+   * <p>The consumer is not required to be thread safe.
+   */
+  <T> CompletableFuture<Void> forInboundConsumer(
+      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      KV<Long, BeamFnApi.Target> inputLocation,
+      Coder<WindowedValue<T>> coder,
+      ThrowingConsumer<WindowedValue<T>> consumer);
+
+  /**
+   * Creates a closeable consumer using the provided instruction id and target.
+   *
+   * <p>The provided coder is used to encode elements on the outbound stream.
+   *
+   * <p>Closing the returned consumer signals the end of the stream.
+   *
+   * <p>The returned closeable consumer is not thread safe.
+   */
+  <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
+      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      KV<Long, BeamFnApi.Target> outputLocation,
+      Coder<WindowedValue<T>> coder);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
new file mode 100644
index 0000000..9bbdc78
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.data;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnDataGrpc;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link BeamFnDataClient} that uses gRPC for sending and receiving data.
+ *
+ * <p>TODO: Handle closing clients that are currently not a consumer nor are being consumed.
+ */
+public class BeamFnDataGrpcClient implements BeamFnDataClient {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
+
+  private final ConcurrentMap<BeamFnApi.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer> cache;
+  private final Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory;
+  private final BiFunction<Function<StreamObserver<BeamFnApi.Elements>,
+                                    StreamObserver<BeamFnApi.Elements>>,
+                           StreamObserver<BeamFnApi.Elements>,
+                           StreamObserver<BeamFnApi.Elements>> streamObserverFactory;
+  private final PipelineOptions options;
+
+  public BeamFnDataGrpcClient(
+      PipelineOptions options,
+      Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+      BiFunction<Function<StreamObserver<BeamFnApi.Elements>, StreamObserver<BeamFnApi.Elements>>,
+                 StreamObserver<BeamFnApi.Elements>,
+                 StreamObserver<BeamFnApi.Elements>> streamObserverFactory) {
+    this.options = options;
+    this.channelFactory = channelFactory;
+    this.streamObserverFactory = streamObserverFactory;
+    this.cache = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Registers the following inbound stream consumer for the provided instruction id and target.
+   *
+   * <p>The provided coder is used to decode elements on the inbound stream. The decoded elements
+   * are passed to the provided consumer. Any failure during decoding or processing of the element
+   * will complete the returned future exceptionally. On successful termination of the stream
+   * (signaled by an empty data block), the returned future is completed successfully.
+   */
+  @Override
+  public <T> CompletableFuture<Void> forInboundConsumer(
+      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      KV<Long, BeamFnApi.Target> inputLocation,
+      Coder<WindowedValue<T>> coder,
+      ThrowingConsumer<WindowedValue<T>> consumer) {
+    LOGGER.debug("Registering consumer instruction {} for target {}",
+        inputLocation.getKey(),
+        inputLocation.getValue());
+
+    CompletableFuture<Void> readFuture = new CompletableFuture<>();
+    BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor);
+    client.futureForKey(inputLocation).complete(
+        new BeamFnDataInboundObserver<>(coder, consumer, readFuture));
+    return readFuture;
+  }
+
+  /**
+   * Creates a closeable consumer using the provided instruction id and target.
+   *
+   * <p>The provided coder is used to encode elements on the outbound stream.
+   *
+   * <p>On closing the returned consumer, an empty data block is sent as a signal of the
+   * logical data stream finishing.
+   *
+   * <p>The returned closeable consumer is not thread safe.
+   */
+  @Override
+  public <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
+      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      KV<Long, BeamFnApi.Target> outputLocation,
+      Coder<WindowedValue<T>> coder) {
+    BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor);
+
+    return new BeamFnDataBufferingOutboundObserver<>(
+        options, outputLocation, coder, client.getOutboundObserver());
+  }
+
+  private BeamFnDataGrpcMultiplexer getClientFor(
+      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor) {
+    return cache.computeIfAbsent(apiServiceDescriptor,
+        (BeamFnApi.ApiServiceDescriptor descriptor) -> new BeamFnDataGrpcMultiplexer(
+            descriptor,
+            (StreamObserver<BeamFnApi.Elements> inboundObserver) -> streamObserverFactory.apply(
+                BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data,
+                inboundObserver)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
new file mode 100644
index 0000000..ea059df
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A gRPC multiplexer for a specific {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor}.
+ *
+ * <p>Multiplexes data for inbound consumers based upon their individual
+ * {@link org.apache.beam.fn.v1.BeamFnApi.Target}s.
+ *
+ * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those
+ * streams. For inbound streams, this is as thread safe as the inbound observers. For outbound
+ * streams, this is as thread safe as the underlying stream observer.
+ *
+ * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying
+ * the output location with a specific outbound observer.
+ */
+public class BeamFnDataGrpcMultiplexer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
+  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final StreamObserver<BeamFnApi.Elements> inboundObserver;
+  private final StreamObserver<BeamFnApi.Elements> outboundObserver;
+  @VisibleForTesting
+  final ConcurrentMap<KV<Long, BeamFnApi.Target>,
+                              CompletableFuture<Consumer<BeamFnApi.Elements.Data>>> consumers;
+
+  public BeamFnDataGrpcMultiplexer(
+      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      Function<StreamObserver<BeamFnApi.Elements>,
+               StreamObserver<BeamFnApi.Elements>> outboundObserverFactory) {
+    this.apiServiceDescriptor = apiServiceDescriptor;
+    this.consumers = new ConcurrentHashMap<>();
+    this.inboundObserver = new InboundObserver();
+    this.outboundObserver = outboundObserverFactory.apply(inboundObserver);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("apiServiceDescriptor", apiServiceDescriptor)
+        .add("consumers", consumers)
+        .toString();
+  }
+
+  public StreamObserver<BeamFnApi.Elements> getInboundObserver() {
+    return inboundObserver;
+  }
+
+  public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
+    return outboundObserver;
+  }
+
+  public CompletableFuture<Consumer<BeamFnApi.Elements.Data>> futureForKey(
+      KV<Long, BeamFnApi.Target> key) {
+    return consumers.computeIfAbsent(
+        key,
+        (KV<Long, BeamFnApi.Target> providedKey) -> new CompletableFuture<>());
+  }
+
+  /**
+   * A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass
+   * the elements to.
+   *
+   * <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the
+   * sending harness to initiate transmitting data without needing for the receiving harness to
+   * signal that it is ready to consume that data.
+   */
+  private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> {
+    @Override
+    public void onNext(BeamFnApi.Elements value) {
+      for (BeamFnApi.Elements.Data data : value.getDataList()) {
+        try {
+          KV<Long, BeamFnApi.Target> key =
+              KV.of(data.getInstructionReference(), data.getTarget());
+          futureForKey(key).get().accept(data);
+          if (data.getData().isEmpty()) {
+            consumers.remove(key);
+          }
+        /*
+         * TODO: On failure we should fail any bundles that were impacted eagerly
+         * instead of relying on the Runner harness to do all the failure handling.
+         */
+        } catch (ExecutionException | InterruptedException e) {
+          LOGGER.error(
+              "Client interrupted during handling of data for instruction {} and target {}",
+              data.getInstructionReference(),
+              data.getTarget(),
+              e);
+          outboundObserver.onError(e);
+        } catch (RuntimeException e) {
+          LOGGER.error(
+              "Client failed to handle data for instruction {} and target {}",
+              data.getInstructionReference(),
+              data.getTarget(),
+              e);
+          outboundObserver.onError(e);
+        }
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      LOGGER.error("Failed to handle for {}", apiServiceDescriptor, t);
+    }
+
+    @Override
+    public void onCompleted() {
+      LOGGER.warn("Hanged up for {}.", apiServiceDescriptor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
new file mode 100644
index 0000000..f8b5ab8
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserver.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.beam.fn.harness.fn.ThrowingConsumer;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decodes individually consumed {@link org.apache.beam.fn.v1.BeamFnApi.Elements.Data} with the
+ * provided {@link Coder} passing the individual decoded elements to the provided consumer.
+ */
+public class BeamFnDataInboundObserver<T> implements Consumer<BeamFnApi.Elements.Data> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataInboundObserver.class);
+  private final ThrowingConsumer<WindowedValue<T>> consumer;
+  private final Coder<WindowedValue<T>> coder;
+  private final CompletableFuture<Void> readFuture;
+  private long byteCounter;
+  private long counter;
+
+  public BeamFnDataInboundObserver(
+      Coder<WindowedValue<T>> coder,
+      ThrowingConsumer<WindowedValue<T>> consumer,
+      CompletableFuture<Void> readFuture) {
+    this.coder = coder;
+    this.consumer = consumer;
+    this.readFuture = readFuture;
+  }
+
+  @Override
+  public void accept(BeamFnApi.Elements.Data t) {
+    if (readFuture.isDone()) {
+      // Drop any incoming data if the stream processing has finished.
+      return;
+    }
+    try {
+      if (t.getData().isEmpty()) {
+        LOGGER.debug("Closing stream for instruction {} and "
+            + "target {} having consumed {} values {} bytes",
+            t.getInstructionReference(),
+            t.getTarget(),
+            counter,
+            byteCounter);
+        readFuture.complete(null);
+        return;
+      }
+
+      byteCounter += t.getData().size();
+      InputStream inputStream = t.getData().newInput();
+      while (inputStream.available() > 0) {
+        counter += 1;
+        WindowedValue<T> value = coder.decode(inputStream, Context.NESTED);
+        consumer.accept(value);
+      }
+    } catch (Exception e) {
+      readFuture.completeExceptionally(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java
new file mode 100644
index 0000000..edaaa65
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Data service client and logical stream multiplexing.
+ */
+package org.apache.beam.fn.harness.data;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
new file mode 100644
index 0000000..b3b7f48
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fake;
+
+import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * A fake implementation of an {@link AggregatorFactory} that is to be filled in at a later time.
+ * The factory returns {@link Aggregator}s that do nothing when a value is added.
+ */
+public class FakeAggregatorFactory implements AggregatorFactory {
+  @Override
+  public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+      Class<?> fnClass,
+      StepContext stepContext,
+      String aggregatorName,
+      CombineFn<InputT, AccumT, OutputT> combine) {
+    return new Aggregator<InputT, OutputT>() {
+      @Override
+      public void addValue(InputT value) {}
+
+      @Override
+      public String getName() {
+        return aggregatorName;
+      }
+
+      @Override
+      public CombineFn<InputT, ?, OutputT> getCombineFn() {
+        return combine;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
new file mode 100644
index 0000000..84da059
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fake;
+
+import java.io.IOException;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * A fake {@link StepContext} factory that performs no-ops.
+ */
+public class FakeStepContext implements StepContext {
+  @Override
+  public String getStepName() {
+    return "TODO";
+  }
+
+  @Override
+  public String getTransformName() {
+    return "TODO";
+  }
+
+  @Override
+  public void noteOutput(WindowedValue<?> output) {
+  }
+
+  @Override
+  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+  }
+
+  @Override
+  public <T, W extends BoundedWindow> void writePCollectionViewData(
+      TupleTag<?> tag,
+      Iterable<WindowedValue<T>> data,
+      Coder<Iterable<WindowedValue<T>>> dataCoder,
+      W window,
+      Coder<W> windowCoder) throws IOException {
+  }
+
+  @Override
+  public StateInternals<?> stateInternals() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TimerInternals timerInternals() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java
new file mode 100644
index 0000000..cd6eb02
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Fake implementations of bindings used with runners-core.
+ */
+package org.apache.beam.fn.harness.fake;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
new file mode 100644
index 0000000..59ab149
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/CloseableThrowingConsumer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+/** A {@link ThrowingConsumer} that can be closed. */
+public interface CloseableThrowingConsumer<T> extends AutoCloseable, ThrowingConsumer<T> {
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
new file mode 100644
index 0000000..9d505da
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingBiFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+import java.util.function.BiFunction;
+
+/**
+ * A {@link BiFunction} which can throw {@link Exception}s.
+ *
+ * <p>Used to expand the allowed set of method references to be used by Java 8
+ * functional interfaces.
+ */
+@FunctionalInterface
+public interface ThrowingBiFunction<T1, T2, T3> {
+  T3 apply(T1 t1, T2 t2) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
new file mode 100644
index 0000000..b34e857
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingConsumer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+import java.util.function.Consumer;
+
+/**
+ * A {@link Consumer} which can throw {@link Exception}s.
+ *
+ * <p>Used to expand the allowed set of method references to be used by Java 8
+ * functional interfaces.
+ */
+@FunctionalInterface
+public interface ThrowingConsumer<T> {
+  void accept(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
new file mode 100644
index 0000000..446ff60
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+import java.util.function.Function;
+
+/**
+ * A {@link Function} which can throw {@link Exception}s.
+ *
+ * <p>Used to expand the allowed set of method references to be used by Java 8
+ * functional interfaces.
+ */
+@FunctionalInterface
+public interface ThrowingFunction<T1, T2> {
+  T2 apply(T1 value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
new file mode 100644
index 0000000..c7fc29e
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/ThrowingRunnable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.fn;
+
+/**
+ * A {@link Runnable} which can throw {@link Exception}s.
+ *
+ * <p>Used to expand the allowed set of method references to be used by Java 8
+ * functional interfaces.
+ */
+@FunctionalInterface
+public interface ThrowingRunnable {
+  void run() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
new file mode 100644
index 0000000..bbf3396
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fn/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java 8 functional interface extensions.
+ */
+package org.apache.beam.fn.harness.fn;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
new file mode 100644
index 0000000..d74d9fa
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.logging;
+
+import static com.google.common.base.Throwables.getStackTraceAsString;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Timestamp;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogManager;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.logging.SimpleFormatter;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnLoggingGrpc;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Configures {@link java.util.logging} to send all {@link LogRecord}s via the Beam Fn Logging API.
+ */
+public class BeamFnLoggingClient implements AutoCloseable {
+  private static final String ROOT_LOGGER_NAME = "";
+  private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity> LOG_LEVEL_MAP =
+      ImmutableMap.<Level, BeamFnApi.LogEntry.Severity>builder()
+      .put(Level.SEVERE, BeamFnApi.LogEntry.Severity.ERROR)
+      .put(Level.WARNING, BeamFnApi.LogEntry.Severity.WARN)
+      .put(Level.INFO, BeamFnApi.LogEntry.Severity.INFO)
+      .put(Level.FINE, BeamFnApi.LogEntry.Severity.DEBUG)
+      .put(Level.FINEST, BeamFnApi.LogEntry.Severity.TRACE)
+      .build();
+
+  private static final ImmutableMap<DataflowWorkerLoggingOptions.Level, Level> LEVEL_CONFIGURATION =
+      ImmutableMap.<DataflowWorkerLoggingOptions.Level, Level>builder()
+          .put(DataflowWorkerLoggingOptions.Level.OFF, Level.OFF)
+          .put(DataflowWorkerLoggingOptions.Level.ERROR, Level.SEVERE)
+          .put(DataflowWorkerLoggingOptions.Level.WARN, Level.WARNING)
+          .put(DataflowWorkerLoggingOptions.Level.INFO, Level.INFO)
+          .put(DataflowWorkerLoggingOptions.Level.DEBUG, Level.FINE)
+          .put(DataflowWorkerLoggingOptions.Level.TRACE, Level.FINEST)
+          .build();
+
+  private static final Formatter FORMATTER = new SimpleFormatter();
+
+  /* Used to signal to a thread processing a queue to finish its work gracefully. */
+  private static final BeamFnApi.LogEntry POISON_PILL =
+      BeamFnApi.LogEntry.newBuilder().setInstructionReference(Long.MIN_VALUE).build();
+
+  /**
+   * The number of log messages that will be buffered. Assuming log messages are at most 1 KiB,
+   * this represents a buffer of about 10 MiBs.
+   */
+  private static final int MAX_BUFFERED_LOG_ENTRY_COUNT = 10_000;
+
+  /* We need to store a reference to the configured loggers so that they are not
+   * garbage collected. java.util.logging only has weak references to the loggers
+   * so if they are garbage collected, our hierarchical configuration will be lost. */
+  private final Collection<Logger> configuredLoggers;
+  private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
+  private final ManagedChannel channel;
+  private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+  private final LogControlObserver inboundObserver;
+  private final LogRecordHandler logRecordHandler;
+  private final CompletableFuture<Object> inboundObserverCompletion;
+
+  public BeamFnLoggingClient(
+      PipelineOptions options,
+      BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
+      Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+      BiFunction<Function<StreamObserver<BeamFnApi.LogControl>,
+                          StreamObserver<BeamFnApi.LogEntry.List>>,
+                 StreamObserver<BeamFnApi.LogControl>,
+                 StreamObserver<BeamFnApi.LogEntry.List>> streamObserverFactory) {
+    this.apiServiceDescriptor = apiServiceDescriptor;
+    this.inboundObserverCompletion = new CompletableFuture<>();
+    this.configuredLoggers = new ArrayList<>();
+    this.channel = channelFactory.apply(apiServiceDescriptor);
+
+    // Reset the global log manager, get the root logger and remove the default log handlers.
+    LogManager logManager = LogManager.getLogManager();
+    logManager.reset();
+    Logger rootLogger = logManager.getLogger(ROOT_LOGGER_NAME);
+    for (Handler handler : rootLogger.getHandlers()) {
+      rootLogger.removeHandler(handler);
+    }
+
+    // Use the passed in logging options to configure the various logger levels.
+    DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
+    if (loggingOptions.getDefaultWorkerLogLevel() != null) {
+      rootLogger.setLevel(LEVEL_CONFIGURATION.get(loggingOptions.getDefaultWorkerLogLevel()));
+    }
+
+    if (loggingOptions.getWorkerLogLevelOverrides() != null) {
+      for (Map.Entry<String, DataflowWorkerLoggingOptions.Level> loggerOverride :
+        loggingOptions.getWorkerLogLevelOverrides().entrySet()) {
+        Logger logger = Logger.getLogger(loggerOverride.getKey());
+        logger.setLevel(LEVEL_CONFIGURATION.get(loggerOverride.getValue()));
+        configuredLoggers.add(logger);
+      }
+    }
+
+    BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel);
+    inboundObserver = new LogControlObserver();
+    logRecordHandler = new LogRecordHandler(options.as(GcsOptions.class).getExecutorService());
+    logRecordHandler.setLevel(Level.ALL);
+    outboundObserver = streamObserverFactory.apply(stub::logging, inboundObserver);
+    rootLogger.addHandler(logRecordHandler);
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Hang up with the server
+    logRecordHandler.close();
+
+    // Wait for the server to hang up
+    inboundObserverCompletion.get();
+
+    // Reset the logging configuration to what it is at startup
+    for (Logger logger : configuredLoggers) {
+      logger.setLevel(null);
+    }
+    configuredLoggers.clear();
+    LogManager.getLogManager().readConfiguration();
+
+    // Shut the channel down
+    channel.shutdown();
+    if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
+      channel.shutdownNow();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(BeamFnLoggingClient.class)
+        .add("apiServiceDescriptor", apiServiceDescriptor)
+        .toString();
+  }
+
+  private class LogRecordHandler extends Handler implements Runnable {
+    private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries =
+        new LinkedBlockingDeque<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+    private final Future<?> bufferedLogWriter;
+    private final ThreadLocal<Consumer<BeamFnApi.LogEntry>> logEntryHandler;
+
+    private LogRecordHandler(ExecutorService executorService) {
+      bufferedLogWriter = executorService.submit(this);
+      logEntryHandler = new ThreadLocal<>();
+    }
+
+    @Override
+    public void publish(LogRecord record) {
+      BeamFnApi.LogEntry.Severity severity = LOG_LEVEL_MAP.get(record.getLevel());
+      if (severity == null) {
+        return;
+      }
+      BeamFnApi.LogEntry.Builder builder = BeamFnApi.LogEntry.newBuilder()
+          .setSeverity(severity)
+          .setLogLocation(record.getLoggerName())
+          .setMessage(FORMATTER.formatMessage(record))
+          .setThread(Integer.toString(record.getThreadID()))
+          .setTimestamp(Timestamp.newBuilder()
+              .setSeconds(record.getMillis() / 1000)
+              .setNanos((int) (record.getMillis() % 1000) * 1_000_000));
+      if (record.getThrown() != null) {
+        builder.setTrace(getStackTraceAsString(record.getThrown()));
+      }
+      // The thread that sends log records should never perform a blocking publish and
+      // only insert log records best effort. We detect which thread is logging
+      // by using the thread local, defaulting to the blocking publish.
+      MoreObjects.firstNonNull(
+          logEntryHandler.get(), this::blockingPublish).accept(builder.build());
+    }
+
+    /** Blocks caller till enough space exists to publish this log entry. */
+    private void blockingPublish(BeamFnApi.LogEntry logEntry) {
+      try {
+        bufferedLogEntries.put(logEntry);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void run() {
+      // Logging which occurs in this thread will attempt to publish log entries into the
+      // above handler which should never block if the queue is full otherwise
+      // this thread will get stuck.
+      logEntryHandler.set(bufferedLogEntries::offer);
+      List<BeamFnApi.LogEntry> additionalLogEntries =
+          new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+      try {
+        BeamFnApi.LogEntry logEntry;
+        while ((logEntry = bufferedLogEntries.take()) != POISON_PILL) {
+          BeamFnApi.LogEntry.List.Builder builder =
+              BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+          bufferedLogEntries.drainTo(additionalLogEntries);
+          for (int i = 0; i < additionalLogEntries.size(); ++i) {
+            if (additionalLogEntries.get(i) == POISON_PILL) {
+              additionalLogEntries = additionalLogEntries.subList(0, i);
+              break;
+            }
+          }
+          builder.addAllLogEntries(additionalLogEntries);
+          outboundObserver.onNext(builder.build());
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(e);
+      }
+    }
+
+    @Override
+    public void flush() {
+    }
+
+    @Override
+    public void close() {
+      synchronized (outboundObserver) {
+        // If we are done, then a previous caller has already shutdown the queue processing thread
+        // hence we don't need to do it again.
+        if (!bufferedLogWriter.isDone()) {
+          // We check to see if we were able to successfully insert the poison pill at the end of
+          // the queue forcing the remainder of the elements to be processed or if the processing
+          // thread is done.
+          try {
+            // The order of these checks is important because short circuiting will cause us to
+            // insert into the queue first and only if it fails do we check that the thread is done.
+            while (!bufferedLogEntries.offer(POISON_PILL, 60, TimeUnit.SECONDS)
+                || !bufferedLogWriter.isDone()) {
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+          waitTillFinish();
+        }
+        outboundObserver.onCompleted();
+      }
+    }
+
+    private void waitTillFinish() {
+      try {
+        bufferedLogWriter.get();
+      } catch (CancellationException e) {
+        // Ignore cancellations
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private class LogControlObserver implements StreamObserver<BeamFnApi.LogControl> {
+    @Override
+    public void onNext(BeamFnApi.LogControl value) {
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      inboundObserverCompletion.completeExceptionally(t);
+    }
+
+    @Override
+    public void onCompleted() {
+      inboundObserverCompletion.complete(null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java
new file mode 100644
index 0000000..7a4d0a8
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Logging service client and JUL logging handler adapter.
+ */
+package org.apache.beam.fn.harness.logging;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java
new file mode 100644
index 0000000..58080e4
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Top level package for Beam Java Fn Harness.
+ */
+package org.apache.beam.fn.harness;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
new file mode 100644
index 0000000..2007139
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/AdvancingPhaser.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import java.util.concurrent.Phaser;
+
+/**
+ * A {@link Phaser} which never terminates. The default {@link Phaser} implementation terminates
+ * after the first advancement.
+ */
+public final class AdvancingPhaser extends Phaser {
+  public AdvancingPhaser(int numParties) {
+    super(numParties);
+  }
+
+  @Override
+  protected boolean onAdvance(int phase, int registeredParties) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
new file mode 100644
index 0000000..cda3a4b
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/BufferingStreamObserver.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A thread safe {@link StreamObserver} which uses a bounded queue to pass elements to a processing
+ * thread responsible for interacting with the underlying {@link CallStreamObserver}.
+ *
+ * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
+ * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Callers
+ * are expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver}
+ * becomes ready.
+ */
+@ThreadSafe
+public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+  private static final Object POISON_PILL = new Object();
+  private final LinkedBlockingDeque<T> queue;
+  private final Phaser phaser;
+  private final CallStreamObserver<T> outboundObserver;
+  private final Future<?> queueDrainer;
+  private final int bufferSize;
+
+  public BufferingStreamObserver(
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver,
+      ExecutorService executor,
+      int bufferSize) {
+    this.phaser = phaser;
+    this.bufferSize = bufferSize;
+    this.queue = new LinkedBlockingDeque<>(bufferSize);
+    this.outboundObserver = outboundObserver;
+    this.queueDrainer = executor.submit(this::drainQueue);
+  }
+
+  private void drainQueue() {
+    try {
+      while (true) {
+        int currentPhase = phaser.getPhase();
+        while (outboundObserver.isReady()) {
+          T value = queue.take();
+          if (value != POISON_PILL) {
+            outboundObserver.onNext(value);
+          } else {
+            return;
+          }
+        }
+        phaser.awaitAdvance(currentPhase);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public void onNext(T value) {
+    try {
+      // Attempt to add an element to the bounded queue occasionally checking to see
+      // if the queue drainer is still alive.
+      while (!queue.offer(value, 60, TimeUnit.SECONDS)) {
+        checkState(!queueDrainer.isDone(), "Stream observer has finished.");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    synchronized (outboundObserver) {
+      // If we are done, then a previous caller has already shutdown the queue processing thread
+      // hence we don't need to do it again.
+      if (!queueDrainer.isDone()) {
+        // We check to see if we were able to successfully insert the poison pill at the front of
+        // the queue to cancel the processing thread eagerly or if the processing thread is done.
+        try {
+          // The order of these checks is important because short circuiting will cause us to
+          // insert into the queue first and only if it fails do we check that the thread is done.
+          while (!queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS)
+              || !queueDrainer.isDone()) {
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
+        waitTillFinish();
+      }
+      outboundObserver.onError(t);
+    }
+  }
+
+  @Override
+  public void onCompleted() {
+    synchronized (outboundObserver) {
+      // If we are done, then a previous caller has already shutdown the queue processing thread
+      // hence we don't need to do it again.
+      if (!queueDrainer.isDone()) {
+        // We check to see if we were able to successfully insert the poison pill at the end of
+        // the queue forcing the remainder of the elements to be processed or if the processing
+        // thread is done.
+        try {
+          // The order of these checks is important because short circuiting will cause us to
+          // insert into the queue first and only if it fails do we check that the thread is done.
+          while (!queue.offer((T) POISON_PILL, 60, TimeUnit.SECONDS)
+              || !queueDrainer.isDone()) {
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
+        waitTillFinish();
+      }
+      outboundObserver.onCompleted();
+    }
+  }
+
+  @VisibleForTesting
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  private void waitTillFinish() {
+    try {
+      queueDrainer.get();
+    } catch (CancellationException e) {
+      // Cancellation is expected
+      return;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
new file mode 100644
index 0000000..82a1aa4
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DirectStreamObserver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.Phaser;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A {@link StreamObserver} which uses synchronization on the underlying
+ * {@link CallStreamObserver} to provide thread safety.
+ *
+ * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
+ * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready.
+ * Creator is expected to advance the {@link Phaser} whenever the underlying
+ * {@link CallStreamObserver} becomes ready.
+ */
+@ThreadSafe
+public final class DirectStreamObserver<T> implements StreamObserver<T> {
+  private final Phaser phaser;
+  private final CallStreamObserver<T> outboundObserver;
+
+  public DirectStreamObserver(
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver) {
+    this.phaser = phaser;
+    this.outboundObserver = outboundObserver;
+  }
+
+  @Override
+  public void onNext(T value) {
+    int phase = phaser.getPhase();
+    if (!outboundObserver.isReady()) {
+      phaser.awaitAdvance(phase);
+    }
+    synchronized (outboundObserver) {
+      outboundObserver.onNext(value);
+    }
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    synchronized (outboundObserver) {
+      outboundObserver.onError(t);
+    }
+  }
+
+  @Override
+  public void onCompleted() {
+    synchronized (outboundObserver) {
+      outboundObserver.onCompleted();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
new file mode 100644
index 0000000..ef641b0
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/ForwardingClientResponseObserver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * A {@link ClientResponseObserver} which delegates all {@link StreamObserver} calls.
+ *
+ * <p>Used to wrap existing {@link StreamObserver}s to be able to install an
+ * {@link ClientCallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler}.
+ *
+ * <p>This is as thread-safe as the undering stream observer that is being wrapped.
+ */
+final class ForwardingClientResponseObserver<ReqT, RespT>
+    implements ClientResponseObserver<RespT, ReqT> {
+  private final Runnable onReadyHandler;
+  private final StreamObserver<ReqT> inboundObserver;
+
+  ForwardingClientResponseObserver(
+      StreamObserver<ReqT> inboundObserver, Runnable onReadyHandler) {
+    this.inboundObserver = inboundObserver;
+    this.onReadyHandler = onReadyHandler;
+  }
+
+  @Override
+  public void onNext(ReqT value) {
+    inboundObserver.onNext(value);
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    inboundObserver.onError(t);
+  }
+
+  @Override
+  public void onCompleted() {
+    inboundObserver.onCompleted();
+  }
+
+  @Override
+  public void beforeStart(ClientCallStreamObserver<RespT> stream) {
+    stream.setOnReadyHandler(onReadyHandler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
new file mode 100644
index 0000000..9326e11
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/StreamObserverFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.fn.harness.stream;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Uses {@link PipelineOptions} to configure which underlying {@link StreamObserver} implementation
+ * to use.
+ */
+public abstract class StreamObserverFactory {
+  public static StreamObserverFactory fromOptions(PipelineOptions options) {
+    List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
+    if (experiments != null && experiments.contains("beam_fn_api_buffered_stream")) {
+      int bufferSize = Buffered.DEFAULT_BUFFER_SIZE;
+      for (String experiment : experiments) {
+        if (experiment.startsWith("beam_fn_api_buffered_stream_buffer_size=")) {
+          bufferSize = Integer.parseInt(
+              experiment.substring("beam_fn_api_buffered_stream_buffer_size=".length()));
+        }
+      }
+      return new Buffered(options.as(GcsOptions.class).getExecutorService(), bufferSize);
+    }
+    return new Direct();
+  }
+
+  public abstract <ReqT, RespT> StreamObserver<RespT> from(
+      Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+      StreamObserver<ReqT> responseObserver);
+
+  private static class Direct extends StreamObserverFactory {
+
+    @Override
+    public <ReqT, RespT> StreamObserver<RespT> from(
+        Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+        StreamObserver<ReqT> inboundObserver) {
+      AdvancingPhaser phaser = new AdvancingPhaser(1);
+      CallStreamObserver<RespT> outboundObserver = (CallStreamObserver<RespT>) clientFactory.apply(
+          new ForwardingClientResponseObserver<ReqT, RespT>(
+              inboundObserver, phaser::arrive));
+      return new DirectStreamObserver<>(phaser, outboundObserver);
+    }
+  }
+
+  private static class Buffered extends StreamObserverFactory {
+    private static final int DEFAULT_BUFFER_SIZE = 64;
+    private final ExecutorService executorService;
+    private final int bufferSize;
+
+    private Buffered(ExecutorService executorService, int bufferSize) {
+      this.executorService = executorService;
+      this.bufferSize = bufferSize;
+    }
+
+    @Override
+    public <ReqT, RespT> StreamObserver<RespT> from(
+        Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+        StreamObserver<ReqT> inboundObserver) {
+      AdvancingPhaser phaser = new AdvancingPhaser(1);
+      CallStreamObserver<RespT> outboundObserver = (CallStreamObserver<RespT>) clientFactory.apply(
+          new ForwardingClientResponseObserver<ReqT, RespT>(
+              inboundObserver, phaser::arrive));
+      return new BufferingStreamObserver<>(
+          phaser, outboundObserver, executorService, bufferSize);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java
new file mode 100644
index 0000000..df4042c
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC stream management.
+ */
+package org.apache.beam.fn.harness.stream;