You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/11/23 00:17:23 UTC

[2/4] beam git commit: Move Common Fn Execution Concepts to fn-execution

Move Common Fn Execution Concepts to fn-execution

Move Stream control packages to fn-execution. Update java package names.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9e2be91
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9e2be91
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9e2be91

Branch: refs/heads/master
Commit: f9e2be91836fd7fb25ed772a35d9626a7c5a2cc6
Parents: cbb3a73
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 9 18:31:26 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 22 16:16:41 2017 -0800

----------------------------------------------------------------------
 runners/java-fn-execution/pom.xml               |  29 ---
 .../beam/runners/fnexecution/ServerFactory.java |   2 +-
 .../runners/fnexecution/ServerFactoryTest.java  |   6 +-
 sdks/java/fn-execution/build.gradle             |  10 +-
 sdks/java/fn-execution/pom.xml                  |  49 ++--
 .../beam/sdk/fn/stream/AdvancingPhaser.java     |  36 +++
 .../sdk/fn/stream/BufferingStreamObserver.java  | 171 +++++++++++++
 .../apache/beam/sdk/fn/stream/DataStreams.java  | 249 +++++++++++++++++++
 .../sdk/fn/stream/DirectStreamObserver.java     |  71 ++++++
 .../ForwardingClientResponseObserver.java       |  68 +++++
 .../apache/beam/sdk/fn/stream/package-info.java |  22 ++
 .../org/apache/beam/harness/test/Consumer.java  |  26 --
 .../org/apache/beam/harness/test/Supplier.java  |  26 --
 .../apache/beam/harness/test/TestExecutors.java |  93 -------
 .../beam/harness/test/TestExecutorsTest.java    | 175 -------------
 .../apache/beam/harness/test/TestStreams.java   | 185 --------------
 .../beam/harness/test/TestStreamsTest.java      | 109 --------
 .../beam/sdk/fn/stream/AdvancingPhaserTest.java |  53 ++++
 .../fn/stream/BufferingStreamObserverTest.java  | 155 ++++++++++++
 .../beam/sdk/fn/stream/DataStreamsTest.java     | 167 +++++++++++++
 .../sdk/fn/stream/DirectStreamObserverTest.java | 145 +++++++++++
 .../ForwardingClientResponseObserverTest.java   |  60 +++++
 .../org/apache/beam/sdk/fn/test/Consumer.java   |  26 ++
 .../org/apache/beam/sdk/fn/test/Supplier.java   |  26 ++
 .../apache/beam/sdk/fn/test/TestExecutors.java  |  93 +++++++
 .../beam/sdk/fn/test/TestExecutorsTest.java     | 175 +++++++++++++
 .../apache/beam/sdk/fn/test/TestStreams.java    | 185 ++++++++++++++
 .../beam/sdk/fn/test/TestStreamsTest.java       | 109 ++++++++
 .../org/apache/beam/fn/harness/FnHarness.java   |   2 +-
 .../beam/fn/harness/state/BagUserState.java     |   2 +-
 .../beam/fn/harness/stream/AdvancingPhaser.java |  36 ---
 .../harness/stream/BufferingStreamObserver.java | 166 -------------
 .../beam/fn/harness/stream/DataStreams.java     | 229 -----------------
 .../fn/harness/stream/DirectStreamObserver.java |  71 ------
 .../ForwardingClientResponseObserver.java       |  63 -----
 .../harness/stream/StreamObserverFactory.java   |  20 +-
 .../fn/harness/BeamFnDataReadRunnerTest.java    |   4 +-
 .../apache/beam/fn/harness/FnHarnessTest.java   |   4 +-
 .../control/BeamFnControlClientTest.java        |   2 +-
 .../fn/harness/control/RegisterHandlerTest.java |   4 +-
 ...BeamFnDataBufferingOutboundObserverTest.java |   2 +-
 .../harness/data/BeamFnDataGrpcClientTest.java  |   4 +-
 .../data/BeamFnDataGrpcMultiplexerTest.java     |   2 +-
 .../logging/BeamFnLoggingClientTest.java        |   2 +-
 .../state/BeamFnStateGrpcClientCacheTest.java   |   2 +-
 .../fn/harness/stream/AdvancingPhaserTest.java  |  48 ----
 .../stream/BufferingStreamObserverTest.java     | 146 -----------
 .../beam/fn/harness/stream/DataStreamsTest.java | 167 -------------
 .../stream/DirectStreamObserverTest.java        | 139 -----------
 .../ForwardingClientResponseObserverTest.java   |  60 -----
 .../stream/StreamObserverFactoryTest.java       |   3 +
 51 files changed, 1865 insertions(+), 1834 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
index 6ff08b7..3ebcfd0 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -32,35 +32,6 @@
 
   <packaging>jar</packaging>
 
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-enforcer-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>enforce-banned-dependencies</id>
-            <goals>
-              <goal>enforce</goal>
-            </goals>
-            <configuration>
-              <rules>
-                <bannedDependencies>
-                  <excludes>
-                    <exclude>com.google.guava:guava-jdk5</exclude>
-                    <exclude>com.google.protobuf:protobuf-lite</exclude>
-                    <exclude>org.apache.beam:beam-sdks-java-core</exclude>
-                  </excludes>
-                </bannedDependencies>
-              </rules>
-              <fail>true</fail>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 93c787d..bb45d08 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -28,8 +28,8 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
 
 /**
  * A {@link Server gRPC server} factory.

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index e0d7bf9..e9b5fa6 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -38,13 +38,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
-import org.apache.beam.harness.test.Consumer;
-import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.TestStreams;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/build.gradle
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/build.gradle b/sdks/java/fn-execution/build.gradle
index 1ffe428..69ec54a 100644
--- a/sdks/java/fn-execution/build.gradle
+++ b/sdks/java/fn-execution/build.gradle
@@ -21,17 +21,10 @@ applyJavaNature()
 
 description = "Apache Beam :: SDKs :: Java :: Fn Execution"
 
-configurations.all {
-  // Fn Execution contains shared utilities for Runners and Harnesses which use
-  // the Portability framework. Runner-side interactions must not require a
-  // dependency on any particular SDK, so this library must not introduce such an
-  // edge.
-  exclude group: "org.apache.beam", module: "beam-sdks-java-core"
-}
-
 dependencies {
   compile library.java.guava
   shadow project(path: ":beam-model-parent:beam-model-pipeline", configuration: "shadow")
+  shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration: "shadow")
   shadow library.java.grpc_core
   shadow library.java.grpc_stub
   shadow library.java.grpc_netty
@@ -39,6 +32,7 @@ dependencies {
   testCompile library.java.junit
   testCompile library.java.hamcrest_core
   testCompile library.java.hamcrest_library
+  testCompile library.java.mockito_core
 }
 
 task packageTests(type: Jar) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml
index 3bdec38..773873e 100644
--- a/sdks/java/fn-execution/pom.xml
+++ b/sdks/java/fn-execution/pom.xml
@@ -34,39 +34,6 @@
 
   <packaging>jar</packaging>
 
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-enforcer-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>enforce-banned-dependencies</id>
-            <goals>
-              <goal>enforce</goal>
-            </goals>
-            <configuration>
-              <rules>
-                <bannedDependencies>
-                  <excludes>
-                    <exclude>com.google.guava:guava-jdk5</exclude>
-                    <exclude>com.google.protobuf:protobuf-lite</exclude>
-                    <!-- Fn Execution contains shared utilities for Runners and Harnesses which use
-                    the Portability framework. Runner-side interactions must not require a
-                    dependency on any particular SDK, so this library must not introduce such an
-                    edge. -->
-                    <exclude>org.apache.beam:beam-sdks-java-core</exclude>
-                  </excludes>
-                </bannedDependencies>
-              </rules>
-              <fail>true</fail>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -74,6 +41,16 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-core</artifactId>
     </dependency>
@@ -111,5 +88,11 @@
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java
new file mode 100644
index 0000000..c091705
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/AdvancingPhaser.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import java.util.concurrent.Phaser;
+
+/**
+ * A {@link Phaser} which never terminates. The default {@link Phaser} implementation terminates
+ * after the first advancement.
+ */
+public final class AdvancingPhaser extends Phaser {
+  public AdvancingPhaser(int numParties) {
+    super(numParties);
+  }
+
+  @Override
+  protected boolean onAdvance(int phase, int registeredParties) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
new file mode 100644
index 0000000..b541e5f
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserver.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A thread safe {@link StreamObserver} which uses a bounded queue to pass elements to a processing
+ * thread responsible for interacting with the underlying {@link CallStreamObserver}.
+ *
+ * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
+ * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Callers
+ * are expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver}
+ * becomes ready.
+ */
+@ThreadSafe
+public final class BufferingStreamObserver<T> implements StreamObserver<T> {
+  private static final Object POISON_PILL = new Object();
+  private final LinkedBlockingDeque<T> queue;
+  private final Phaser phaser;
+  private final CallStreamObserver<T> outboundObserver;
+  private final Future<?> queueDrainer;
+  private final int bufferSize;
+
+  public BufferingStreamObserver(
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver,
+      ExecutorService executor,
+      int bufferSize) {
+    this.phaser = phaser;
+    this.bufferSize = bufferSize;
+    this.queue = new LinkedBlockingDeque<>(bufferSize);
+    this.outboundObserver = outboundObserver;
+    this.queueDrainer =
+        executor.submit(
+            new Runnable() {
+              @Override
+              public void run() {
+                drainQueue();
+              }
+            });
+  }
+
+  private void drainQueue() {
+    try {
+      while (true) {
+        int currentPhase = phaser.getPhase();
+        while (outboundObserver.isReady()) {
+          T value = queue.take();
+          if (value != POISON_PILL) {
+            outboundObserver.onNext(value);
+          } else {
+            return;
+          }
+        }
+        phaser.awaitAdvance(currentPhase);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public void onNext(T value) {
+    try {
+      // Attempt to add an element to the bounded queue occasionally checking to see
+      // if the queue drainer is still alive.
+      while (!queue.offer(value, 60, TimeUnit.SECONDS)) {
+        checkState(!queueDrainer.isDone(), "Stream observer has finished.");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    synchronized (outboundObserver) {
+      // If we are done, then a previous caller has already shutdown the queue processing thread
+      // hence we don't need to do it again.
+      if (!queueDrainer.isDone()) {
+        // We check to see if we were able to successfully insert the poison pill at the front of
+        // the queue to cancel the processing thread eagerly or if the processing thread is done.
+        try {
+          // We shouldn't attempt to insert into the queue if the queue drainer thread is done
+          // since the queue may be full and nothing will be emptying it.
+          while (!queueDrainer.isDone()
+              && !queue.offerFirst((T) POISON_PILL, 60, TimeUnit.SECONDS)) {}
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
+        waitTillFinish();
+      }
+      outboundObserver.onError(t);
+    }
+  }
+
+  @Override
+  public void onCompleted() {
+    synchronized (outboundObserver) {
+      // If we are done, then a previous caller has already shutdown the queue processing thread
+      // hence we don't need to do it again.
+      if (!queueDrainer.isDone()) {
+        // We check to see if we were able to successfully insert the poison pill at the end of
+        // the queue forcing the remainder of the elements to be processed or if the processing
+        // thread is done.
+        try {
+          // We shouldn't attempt to insert into the queue if the queue drainer thread is done
+          // since the queue may be full and nothing will be emptying it.
+          while (!queueDrainer.isDone()
+              && !queue.offerLast((T) POISON_PILL, 60, TimeUnit.SECONDS)) {}
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
+        waitTillFinish();
+      }
+      outboundObserver.onCompleted();
+    }
+  }
+
+  @VisibleForTesting
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  private void waitTillFinish() {
+    try {
+      queueDrainer.get();
+    } catch (CancellationException e) {
+      // Cancellation is expected
+      return;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
new file mode 100644
index 0000000..35abc4c
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingInputStream;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PushbackInputStream;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * {@link #inbound(Iterator)} treats multiple {@link ByteString}s as a single input stream and
+ * {@link #outbound(OutputChunkConsumer)} treats a single {@link OutputStream} as multiple
+ * {@link ByteString}s.
+ */
+// TODO: Migrate logic from BeamFnDataBufferingOutboundObserver to support Outbound
+public class DataStreams {
+  /**
+   * Converts multiple {@link ByteString}s into a single {@link InputStream}.
+   *
+   * <p>The iterator is accessed lazily. The supplied {@link Iterator} should block until
+   * either it knows that no more values will be provided or it has the next {@link ByteString}.
+   */
+  public static InputStream inbound(Iterator<ByteString> bytes) {
+    return new Inbound(bytes);
+  }
+
+  /**
+   * Converts a single {@link OutputStream} into multiple {@link ByteString ByteStrings}.
+   */
+  public static OutputStream outbound(OutputChunkConsumer<ByteString> consumer) {
+    // TODO: Migrate logic from BeamFnDataBufferingOutboundObserver
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Reads chunks of output.
+   *
+   * @deprecated Used as a temporary placeholder until implementation of
+   * {@link #outbound(OutputChunkConsumer)}.
+   */
+  @Deprecated
+  public interface OutputChunkConsumer<T> {
+    void read(T chunk) throws Exception;
+  }
+
+  /**
+   * An input stream which concatenates multiple {@link ByteString}s. Lazily accesses the
+   * first {@link Iterator} on first access of this input stream.
+   *
+   * <p>Closing this input stream has no effect.
+   */
+  private static class Inbound<T> extends InputStream {
+    private static final InputStream EMPTY_STREAM = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+
+    private final Iterator<ByteString> bytes;
+    private InputStream currentStream;
+
+    public Inbound(Iterator<ByteString> bytes) {
+      this.currentStream = EMPTY_STREAM;
+      this.bytes = bytes;
+    }
+
+    @Override
+    public int read() throws IOException {
+      int rval = -1;
+      // Move on to the next stream if we have read nothing
+      while ((rval = currentStream.read()) == -1 && bytes.hasNext()) {
+        currentStream = bytes.next().newInput();
+      }
+      return rval;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      int remainingLen = len;
+      while ((remainingLen -= ByteStreams.read(
+          currentStream, b, off + len - remainingLen, remainingLen)) > 0) {
+        if (bytes.hasNext()) {
+          currentStream = bytes.next().newInput();
+        } else {
+          int bytesRead = len - remainingLen;
+          return bytesRead > 0 ? bytesRead : -1;
+        }
+      }
+      return len - remainingLen;
+    }
+  }
+
+  /**
+   * An adapter which converts an {@link InputStream} to an {@link Iterator} of {@code T} values
+   * using the specified {@link Coder}.
+   *
+   * <p>Note that this adapter follows the Beam Fn API specification for forcing values that decode
+   * consuming zero bytes to consuming exactly one byte.
+   *
+   * <p>Note that access to the underlying {@link InputStream} is lazy and will only be invoked on
+   * first access to {@link #next()} or {@link #hasNext()}.
+   */
+  public static class DataStreamDecoder<T> implements Iterator<T> {
+    private enum State { READ_REQUIRED, HAS_NEXT, EOF };
+
+    private final CountingInputStream countingInputStream;
+    private final PushbackInputStream pushbackInputStream;
+    private final Coder<T> coder;
+    private State currentState;
+    private T next;
+    public DataStreamDecoder(Coder<T> coder, InputStream inputStream) {
+      this.currentState = State.READ_REQUIRED;
+      this.coder = coder;
+      this.pushbackInputStream = new PushbackInputStream(inputStream, 1);
+      this.countingInputStream = new CountingInputStream(pushbackInputStream);
+    }
+
+    @Override
+    public boolean hasNext() {
+      switch (currentState) {
+        case EOF:
+          return false;
+        case READ_REQUIRED:
+          try {
+            int nextByte = pushbackInputStream.read();
+            if (nextByte == -1) {
+              currentState = State.EOF;
+              return false;
+            }
+
+            pushbackInputStream.unread(nextByte);
+            long count = countingInputStream.getCount();
+            next = coder.decode(countingInputStream);
+            // Skip one byte if decoding the value consumed 0 bytes.
+            if (countingInputStream.getCount() - count == 0) {
+              checkState(countingInputStream.read() != -1, "Unexpected EOF reached");
+            }
+            currentState = State.HAS_NEXT;
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+          return true;
+        case HAS_NEXT:
+          return true;
+      }
+      throw new IllegalStateException(String.format("Unknown state %s", currentState));
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      currentState = State.READ_REQUIRED;
+      return next;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * Allows for one or more writing threads to append values to this iterator while one reading
+   * thread reads values. {@link #hasNext()} and {@link #next()} will block until a value is
+   * available or this has been closed.
+   *
+   * <p>External synchronization must be provided if multiple readers would like to access the
+   * {@link Iterator#hasNext()} and {@link Iterator#next()} methods.
+   *
+   * <p>The order or values which are appended to this iterator is nondeterministic when multiple
+   * threads call {@link #accept(Object)}.
+   */
+  public static class BlockingQueueIterator<T>
+      implements AutoCloseable, Iterator<T> {
+    private static final Object POISION_PILL = new Object();
+    private final BlockingQueue<T> queue;
+
+    /** Only accessed by {@link Iterator#hasNext()} and {@link Iterator#next()} methods. */
+    private T currentElement;
+
+    public BlockingQueueIterator(BlockingQueue<T> queue) {
+      this.queue = queue;
+    }
+
+    @Override
+    public void close() throws Exception {
+      queue.put((T) POISION_PILL);
+    }
+
+    public void accept(T t) throws Exception {
+      queue.put(t);
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (currentElement == null) {
+        try {
+          currentElement = queue.take();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        }
+      }
+      return currentElement != POISION_PILL;
+    }
+
+    @Override
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      T rval = currentElement;
+      currentElement = null;
+      return rval;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
new file mode 100644
index 0000000..eb7183f
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.Phaser;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A {@link StreamObserver} which uses synchronization on the underlying
+ * {@link CallStreamObserver} to provide thread safety.
+ *
+ * <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
+ * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready.
+ * Creator is expected to advance the {@link Phaser} whenever the underlying
+ * {@link CallStreamObserver} becomes ready.
+ */
+@ThreadSafe
+public final class DirectStreamObserver<T> implements StreamObserver<T> {
+  private final Phaser phaser;
+  private final CallStreamObserver<T> outboundObserver;
+
+  public DirectStreamObserver(
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver) {
+    this.phaser = phaser;
+    this.outboundObserver = outboundObserver;
+  }
+
+  @Override
+  public void onNext(T value) {
+    int phase = phaser.getPhase();
+    if (!outboundObserver.isReady()) {
+      phaser.awaitAdvance(phase);
+    }
+    synchronized (outboundObserver) {
+      outboundObserver.onNext(value);
+    }
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    synchronized (outboundObserver) {
+      outboundObserver.onError(t);
+    }
+  }
+
+  @Override
+  public void onCompleted() {
+    synchronized (outboundObserver) {
+      outboundObserver.onCompleted();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java
new file mode 100644
index 0000000..958c69b
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserver.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * A {@link ClientResponseObserver} which delegates all {@link StreamObserver} calls.
+ *
+ * <p>Used to wrap existing {@link StreamObserver}s to be able to install an
+ * {@link ClientCallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler}.
+ *
+ * <p>This is as thread-safe as the underlying stream observer that is being wrapped.
+ */
+public final class ForwardingClientResponseObserver<ReqT, RespT>
+    implements ClientResponseObserver<RespT, ReqT> {
+  public static <ReqT, RespT> ForwardingClientResponseObserver<ReqT, RespT> create(
+      StreamObserver<ReqT> inbound, Runnable onReadyHandler) {
+    return new ForwardingClientResponseObserver<>(inbound, onReadyHandler);
+  }
+
+  private final Runnable onReadyHandler;
+  private final StreamObserver<ReqT> inboundObserver;
+
+  ForwardingClientResponseObserver(
+      StreamObserver<ReqT> inboundObserver, Runnable onReadyHandler) {
+    this.inboundObserver = inboundObserver;
+    this.onReadyHandler = onReadyHandler;
+  }
+
+  @Override
+  public void onNext(ReqT value) {
+    inboundObserver.onNext(value);
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    inboundObserver.onError(t);
+  }
+
+  @Override
+  public void onCompleted() {
+    inboundObserver.onCompleted();
+  }
+
+  @Override
+  public void beforeStart(ClientCallStreamObserver<RespT> stream) {
+    stream.setOnReadyHandler(onReadyHandler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java
new file mode 100644
index 0000000..6aa2729
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC stream management.
+ */
+package org.apache.beam.sdk.fn.stream;

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
deleted file mode 100644
index 279fc29..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-/**
- * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers.
- */
-public interface Consumer<T> {
-  void accept(T item);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
deleted file mode 100644
index 629afc2..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-/**
- * A fork of the Java 8 Supplier interface, to enable migrations.
- */
-public interface Supplier<T> {
-  T get();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
deleted file mode 100644
index ca12d5a..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-import com.google.common.util.concurrent.ForwardingExecutorService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/**
- * A {@link TestRule} that validates that all submitted tasks finished and were completed. This
- * allows for testing that tasks have exercised the appropriate shutdown logic.
- */
-public class TestExecutors {
-  public static TestExecutorService from(final ExecutorService staticExecutorService) {
-    return from(new Supplier<ExecutorService>() {
-      @Override
-      public ExecutorService get() {
-        return staticExecutorService;
-      }
-    });
-  }
-
-  public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) {
-    return new FromSupplier(executorServiceSuppler);
-  }
-
-  /** A union of the {@link ExecutorService} and {@link TestRule} interfaces. */
-  public interface TestExecutorService extends ExecutorService, TestRule {}
-
-  private static class FromSupplier extends ForwardingExecutorService
-      implements TestExecutorService {
-    private final Supplier<ExecutorService> executorServiceSupplier;
-    private ExecutorService delegate;
-
-    private FromSupplier(Supplier<ExecutorService> executorServiceSupplier) {
-      this.executorServiceSupplier = executorServiceSupplier;
-    }
-
-    @Override
-    public Statement apply(final Statement statement, Description arg1) {
-      return new Statement() {
-        @Override
-        public void evaluate() throws Throwable {
-          Throwable thrown = null;
-          delegate = executorServiceSupplier.get();
-          try {
-            statement.evaluate();
-          } catch (Throwable t) {
-            thrown = t;
-          }
-          shutdown();
-          if (!awaitTermination(5, TimeUnit.SECONDS)) {
-            shutdownNow();
-            IllegalStateException e =
-                new IllegalStateException("Test executor failed to shutdown cleanly.");
-            if (thrown != null) {
-              thrown.addSuppressed(e);
-            } else {
-              thrown = e;
-            }
-          }
-          if (thrown != null) {
-            throw thrown;
-          }
-        }
-      };
-    }
-
-    @Override
-    protected ExecutorService delegate() {
-      return delegate;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
deleted file mode 100644
index f0c98e0..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.model.Statement;
-
-/** Tests for {@link TestExecutors}. */
-@RunWith(JUnit4.class)
-public class TestExecutorsTest {
-  @Test
-  public void testSuccessfulTermination() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(service);
-    final AtomicBoolean taskRan = new AtomicBoolean();
-    testService
-        .apply(
-            new Statement() {
-              @Override
-              public void evaluate() throws Throwable {
-                testService.submit(new Runnable() {
-                  @Override
-                  public void run() {
-                    taskRan.set(true);
-                  }
-                });
-              }
-            },
-            null)
-        .evaluate();
-    assertTrue(service.isTerminated());
-    assertTrue(taskRan.get());
-  }
-
-  @Test
-  public void testTaskBlocksForeverCausesFailure() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(service);
-    final AtomicBoolean taskStarted = new AtomicBoolean();
-    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  testService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                      taskToRun();
-                    }
-                  });
-                }
-
-                private void taskToRun() {
-                  taskStarted.set(true);
-                  try {
-                    while (true) {
-                      Thread.sleep(10000);
-                    }
-                  } catch (InterruptedException e) {
-                    taskWasInterrupted.set(true);
-                    return;
-                  }
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (IllegalStateException e) {
-      assertEquals(IllegalStateException.class, e.getClass());
-      assertEquals("Test executor failed to shutdown cleanly.", e.getMessage());
-    }
-    assertTrue(service.isShutdown());
-  }
-
-  @Test
-  public void testStatementFailurePropagatedCleanly() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(service);
-    final RuntimeException exceptionToThrow = new RuntimeException();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  throw exceptionToThrow;
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (RuntimeException thrownException) {
-      assertSame(exceptionToThrow, thrownException);
-    }
-    assertTrue(service.isShutdown());
-  }
-
-  @Test
-  public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
-      throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(service);
-    final AtomicBoolean taskStarted = new AtomicBoolean();
-    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
-    final RuntimeException exceptionToThrow = new RuntimeException();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  testService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                      taskToRun();
-                    }
-                  });
-                  throw exceptionToThrow;
-                }
-
-                private void taskToRun() {
-                  taskStarted.set(true);
-                  try {
-                    while (true) {
-                      Thread.sleep(10000);
-                    }
-                  } catch (InterruptedException e) {
-                    taskWasInterrupted.set(true);
-                    return;
-                  }
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (RuntimeException thrownException) {
-      assertSame(exceptionToThrow, thrownException);
-      assertEquals(1, exceptionToThrow.getSuppressed().length);
-      assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass());
-      assertEquals(
-          "Test executor failed to shutdown cleanly.",
-          exceptionToThrow.getSuppressed()[0].getMessage());
-    }
-    assertTrue(service.isShutdown());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
deleted file mode 100644
index 3df743a..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.StreamObserver;
-
-/** Utility methods which enable testing of {@link StreamObserver}s. */
-public class TestStreams {
-  /**
-   * Creates a test {@link CallStreamObserver}  {@link Builder} that forwards
-   * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}.
-   */
-  public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
-    return new Builder<>(new ForwardingCallStreamObserver<>(
-        onNext,
-        TestStreams.<Throwable>noopConsumer(),
-        TestStreams.noopRunnable(),
-        TestStreams.alwaysTrueSupplier()));
-  }
-
-  /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
-  public static class Builder<T> {
-    private final ForwardingCallStreamObserver<T> observer;
-    private Builder(ForwardingCallStreamObserver<T> observer) {
-      this.observer = observer;
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link CallStreamObserver#isReady} callback.
-     */
-    public Builder<T> withIsReady(Supplier<Boolean> isReady) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          observer.onError,
-          observer.onCompleted,
-          isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onCompleted} callback.
-     */
-    public Builder<T> withOnCompleted(Runnable onCompleted) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          observer.onError,
-          onCompleted,
-          observer.isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onError} callback.
-     */
-    public Builder<T> withOnError(final Runnable onError) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          new Consumer<Throwable>() {
-            @Override
-            public void accept(Throwable t) {
-              onError.run();
-            }
-          },
-          observer.onCompleted,
-          observer.isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onError} consumer.
-     */
-    public Builder<T> withOnError(Consumer<Throwable> onError) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext, onError, observer.onCompleted, observer.isReady));
-    }
-
-    public CallStreamObserver<T> build() {
-      return observer;
-    }
-  }
-
-  private static void noop() {
-  }
-
-  private static Runnable noopRunnable() {
-    return new Runnable() {
-      @Override
-      public void run() {
-      }
-    };
-  }
-
-  private static void noop(Throwable t) {
-  }
-
-  private static <T> Consumer<T> noopConsumer() {
-    return new Consumer<T>() {
-      @Override
-      public void accept(T item) {
-      }
-    };
-  }
-
-  private static boolean returnTrue() {
-    return true;
-  }
-
-  private static Supplier<Boolean> alwaysTrueSupplier() {
-    return new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        return true;
-      }
-    };
-  }
-
-  /** A {@link CallStreamObserver} which executes the supplied callbacks. */
-  private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
-    private final Consumer<T> onNext;
-    private final Supplier<Boolean> isReady;
-    private final Consumer<Throwable> onError;
-    private final Runnable onCompleted;
-
-    public ForwardingCallStreamObserver(
-        Consumer<T> onNext,
-        Consumer<Throwable> onError,
-        Runnable onCompleted,
-        Supplier<Boolean> isReady) {
-      this.onNext = onNext;
-      this.onError = onError;
-      this.onCompleted = onCompleted;
-      this.isReady = isReady;
-    }
-
-    @Override
-    public void onNext(T value) {
-      onNext.accept(value);
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      onError.accept(t);
-    }
-
-    @Override
-    public void onCompleted() {
-      onCompleted.run();
-    }
-
-    @Override
-    public boolean isReady() {
-      return isReady.get();
-    }
-
-    @Override
-    public void setOnReadyHandler(Runnable onReadyHandler) {}
-
-    @Override
-    public void disableAutoInboundFlowControl() {}
-
-    @Override
-    public void request(int count) {}
-
-    @Override
-    public void setMessageCompression(boolean enable) {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
deleted file mode 100644
index c578397..0000000
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.harness.test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link TestStreams}. */
-@RunWith(JUnit4.class)
-public class TestStreamsTest {
-  @Test
-  public void testOnNextIsCalled() {
-    final AtomicBoolean onNextWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(new Consumer<Boolean>() {
-      @Override
-      public void accept(Boolean item) {
-        onNextWasCalled.set(item);
-      }
-    }).build().onNext(true);
-    assertTrue(onNextWasCalled.get());
-  }
-
-  @Test
-  public void testIsReadyIsCalled() {
-    final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
-    assertFalse(TestStreams.withOnNext(null)
-        .withIsReady(new Supplier<Boolean>() {
-          @Override
-          public Boolean get() {
-            return isReadyWasCalled.getAndSet(true);
-          }
-        })
-        .build()
-        .isReady());
-    assertTrue(isReadyWasCalled.get());
-  }
-
-  @Test
-  public void testOnCompletedIsCalled() {
-    final AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(null)
-        .withOnCompleted(new Runnable() {
-          @Override
-          public void run() {
-            onCompletedWasCalled.set(true);
-          }
-        })
-        .build()
-        .onCompleted();
-    assertTrue(onCompletedWasCalled.get());
-  }
-
-  @Test
-  public void testOnErrorRunnableIsCalled() {
-    RuntimeException throwable = new RuntimeException();
-    final AtomicBoolean onErrorWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(null)
-        .withOnError(new Runnable() {
-          @Override
-          public void run() {
-            onErrorWasCalled.set(true);
-          }
-        })
-        .build()
-        .onError(throwable);
-    assertTrue(onErrorWasCalled.get());
-  }
-
-  @Test
-  public void testOnErrorConsumerIsCalled() {
-    RuntimeException throwable = new RuntimeException();
-    final Collection<Throwable> onErrorWasCalled = new ArrayList<>();
-    TestStreams.withOnNext(null)
-        .withOnError(new Consumer<Throwable>() {
-          @Override
-          public void accept(Throwable item) {
-            onErrorWasCalled.add(item);
-          }
-        })
-        .build()
-        .onError(throwable);
-    assertThat(onErrorWasCalled, Matchers.<Throwable>contains(throwable));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java
new file mode 100644
index 0000000..3248ab2
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/AdvancingPhaserTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link AdvancingPhaser}. */
+@RunWith(JUnit4.class)
+public class AdvancingPhaserTest {
+  @Test
+  public void testAdvancement() throws Exception {
+    final AdvancingPhaser phaser = new AdvancingPhaser(1);
+    int currentPhase = phaser.getPhase();
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    service.submit(new Runnable() {
+      @Override
+      public void run() {
+        phaser.arrive();
+      }
+    });
+    phaser.awaitAdvance(currentPhase);
+    assertFalse(phaser.isTerminated());
+    service.shutdown();
+    if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
+      assertThat(service.shutdownNow(), empty());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java
new file mode 100644
index 0000000..54d02b8
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.Supplier;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BufferingStreamObserver}. */
+@RunWith(JUnit4.class)
+public class BufferingStreamObserverTest {
+  @Rule public TestExecutorService executor = TestExecutors.from(Executors.newCachedThreadPool());
+
+  @Test
+  public void testThreadSafety() throws Exception {
+    final List<String> onNextValues = new ArrayList<>();
+    AdvancingPhaser phaser = new AdvancingPhaser(1);
+    final AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
+    final BufferingStreamObserver<String> streamObserver =
+        new BufferingStreamObserver<>(
+            phaser,
+            TestStreams.withOnNext(
+                new Consumer<String>() {
+                  @Override
+                  public void accept(String t) {
+                    // Use the atomic boolean to detect if multiple threads are in this
+                    // critical section. Any thread that enters purposefully blocks by sleeping
+                    // to increase the contention between threads artificially.
+                    assertFalse(isCriticalSectionShared.getAndSet(true));
+                    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+                    onNextValues.add(t);
+                    assertTrue(isCriticalSectionShared.getAndSet(false));
+                  }
+                }).build(),
+            executor,
+            3);
+
+    List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
+    List<Callable<String>> tasks = new ArrayList<>();
+    for (final String prefix : prefixes) {
+      tasks.add(
+          new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+              for (int i = 0; i < 10; i++) {
+                streamObserver.onNext(prefix + i);
+              }
+              return prefix;
+            }
+          });
+    }
+    List<Future<String>> results = executor.invokeAll(tasks);
+    for (Future<String> result : results) {
+      result.get();
+    }
+    streamObserver.onCompleted();
+
+    // Check that order was maintained.
+    int[] prefixesIndex = new int[prefixes.size()];
+    assertEquals(50, onNextValues.size());
+    for (String onNextValue : onNextValues) {
+      int prefix = Integer.parseInt(onNextValue.substring(0, 1));
+      int suffix = Integer.parseInt(onNextValue.substring(1, 2));
+      assertEquals(prefixesIndex[prefix], suffix);
+      prefixesIndex[prefix] += 1;
+    }
+  }
+
+  @Test
+  public void testIsReadyIsHonored() throws Exception {
+    AdvancingPhaser phaser = new AdvancingPhaser(1);
+    final AtomicBoolean elementsAllowed = new AtomicBoolean();
+    final BufferingStreamObserver<String> streamObserver =
+        new BufferingStreamObserver<>(
+            phaser,
+            TestStreams.withOnNext(
+                    new Consumer<String>() {
+                      @Override
+                      public void accept(String t) {
+                        assertTrue(elementsAllowed.get());
+                      }
+                    })
+                .withIsReady(
+                    new Supplier<Boolean>() {
+                      @Override
+                      public Boolean get() {
+                        return elementsAllowed.get();
+                      }
+                    })
+                .build(),
+            executor,
+            3);
+
+    // Start all the tasks
+    List<Future<String>> results = new ArrayList<>();
+    for (final String prefix : ImmutableList.of("0", "1", "2", "3", "4")) {
+      results.add(
+          executor.submit(
+              new Callable<String>() {
+                @Override
+                public String call() throws Exception {
+                  for (int i = 0; i < 10; i++) {
+                    streamObserver.onNext(prefix + i);
+                  }
+                  return prefix;
+                }
+              }));
+    }
+
+    // Have them wait and then flip that we do allow elements and wake up those awaiting
+    Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+    elementsAllowed.set(true);
+    phaser.arrive();
+
+    for (Future<String> result : results) {
+      result.get();
+    }
+    streamObserver.onCompleted();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
new file mode 100644
index 0000000..852b3d0
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeTrue;
+
+import com.google.common.collect.Iterators;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.stream.DataStreams.BlockingQueueIterator;
+import org.apache.beam.sdk.fn.stream.DataStreams.DataStreamDecoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataStreams}. */
+@RunWith(Enclosed.class)
+public class DataStreamsTest {
+
+  /** Tests for {@link DataStreams.Inbound}. */
+  @RunWith(JUnit4.class)
+  public static class InboundTest {
+    private static final ByteString BYTES_A = ByteString.copyFromUtf8("TestData");
+    private static final ByteString BYTES_B = ByteString.copyFromUtf8("SomeOtherTestData");
+
+    @Test
+    public void testEmptyRead() throws Exception {
+      assertEquals(ByteString.EMPTY, read());
+      assertEquals(ByteString.EMPTY, read(ByteString.EMPTY));
+      assertEquals(ByteString.EMPTY, read(ByteString.EMPTY, ByteString.EMPTY));
+    }
+
+    @Test
+    public void testRead() throws Exception {
+      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B));
+      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, ByteString.EMPTY, BYTES_B));
+      assertEquals(BYTES_A.concat(BYTES_B), read(BYTES_A, BYTES_B, ByteString.EMPTY));
+    }
+
+    private static ByteString read(ByteString... bytes) throws IOException {
+      return ByteString.readFrom(DataStreams.inbound(Arrays.asList(bytes).iterator()));
+    }
+  }
+
+  /** Tests for {@link DataStreams.BlockingQueueIterator}. */
+  @RunWith(JUnit4.class)
+  public static class BlockingQueueIteratorTest {
+    @Test(timeout = 10_000)
+    public void testBlockingQueueIteratorWithoutBlocking() throws Exception {
+      BlockingQueueIterator<String> iterator =
+          new BlockingQueueIterator<>(new ArrayBlockingQueue<String>(3));
+
+      iterator.accept("A");
+      iterator.accept("B");
+      iterator.close();
+
+      assertEquals(Arrays.asList("A", "B"),
+          Arrays.asList(Iterators.toArray(iterator, String.class)));
+    }
+
+    @Test(timeout = 10_000)
+    public void testBlockingQueueIteratorWithBlocking() throws Exception {
+      // The synchronous queue only allows for one element to transfer at a time and blocks
+      // the sending/receiving parties until both parties are there.
+      final BlockingQueueIterator<String> iterator =
+          new BlockingQueueIterator<>(new SynchronousQueue<String>());
+      final SettableFuture<List<String>> valuesFuture = SettableFuture.create();
+      Thread appender = new Thread() {
+        @Override
+        public void run() {
+          valuesFuture.set(Arrays.asList(Iterators.toArray(iterator, String.class)));
+        }
+      };
+      appender.start();
+      iterator.accept("A");
+      iterator.accept("B");
+      iterator.close();
+      assertEquals(Arrays.asList("A", "B"), valuesFuture.get());
+      appender.join();
+    }
+  }
+
+  /** Tests for {@link DataStreams.DataStreamDecoder}. */
+  @RunWith(JUnit4.class)
+  public static class DataStreamDecoderTest {
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testEmptyInputStream() throws Exception {
+      testDecoderWith(StringUtf8Coder.of());
+    }
+
+    @Test
+    public void testNonEmptyInputStream() throws Exception {
+      testDecoderWith(StringUtf8Coder.of(), "A", "BC", "DEF", "GHIJ");
+    }
+
+    @Test
+    public void testNonEmptyInputStreamWithZeroLengthCoder() throws Exception {
+      CountingOutputStream countingOutputStream =
+          new CountingOutputStream(ByteStreams.nullOutputStream());
+      GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, countingOutputStream);
+      assumeTrue(countingOutputStream.getCount() == 0);
+
+      testDecoderWith(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
+    }
+
+    private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      for (T value : expected) {
+        int size = baos.size();
+        coder.encode(value, baos);
+        // Pad an arbitrary byte when values encode to zero bytes
+        if (baos.size() - size == 0) {
+          baos.write(0);
+        }
+      }
+
+      Iterator<T> decoder =
+          new DataStreamDecoder<>(coder, new ByteArrayInputStream(baos.toByteArray()));
+
+      Object[] actual = Iterators.toArray(decoder, Object.class);
+      assertArrayEquals(expected, actual);
+
+      assertFalse(decoder.hasNext());
+      assertFalse(decoder.hasNext());
+
+      thrown.expect(NoSuchElementException.class);
+      decoder.next();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java
new file mode 100644
index 0000000..d59dfbc
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DirectStreamObserverTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.Supplier;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DirectStreamObserver}. */
+@RunWith(JUnit4.class)
+public class DirectStreamObserverTest {
+  @Rule public TestExecutorService executor = TestExecutors.from(Executors.newCachedThreadPool());
+
+  @Test
+  public void testThreadSafety() throws Exception {
+    final List<String> onNextValues = new ArrayList<>();
+    AdvancingPhaser phaser = new AdvancingPhaser(1);
+    final AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
+    final DirectStreamObserver<String> streamObserver =
+        new DirectStreamObserver<>(
+            phaser,
+            TestStreams.withOnNext(
+                new Consumer<String>() {
+                  @Override
+                  public void accept(String t) {
+                    // Use the atomic boolean to detect if multiple threads are in this
+                    // critical section. Any thread that enters purposefully blocks by sleeping
+                    // to increase the contention between threads artificially.
+                    assertFalse(isCriticalSectionShared.getAndSet(true));
+                    Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+                    onNextValues.add(t);
+                    assertTrue(isCriticalSectionShared.getAndSet(false));
+                  }
+                }).build());
+
+    List<String> prefixes = ImmutableList.of("0", "1", "2", "3", "4");
+    List<Callable<String>> tasks = new ArrayList<>();
+    for (final String prefix : prefixes) {
+      tasks.add(
+          new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+              for (int i = 0; i < 10; i++) {
+                streamObserver.onNext(prefix + i);
+              }
+              return prefix;
+            }
+          });
+    }
+    executor.invokeAll(tasks);
+    streamObserver.onCompleted();
+
+    // Check that order was maintained.
+    int[] prefixesIndex = new int[prefixes.size()];
+    assertEquals(50, onNextValues.size());
+    for (String onNextValue : onNextValues) {
+      int prefix = Integer.parseInt(onNextValue.substring(0, 1));
+      int suffix = Integer.parseInt(onNextValue.substring(1, 2));
+      assertEquals(prefixesIndex[prefix], suffix);
+      prefixesIndex[prefix] += 1;
+    }
+  }
+
+  @Test
+  public void testIsReadyIsHonored() throws Exception {
+    AdvancingPhaser phaser = new AdvancingPhaser(1);
+    final AtomicBoolean elementsAllowed = new AtomicBoolean();
+    final DirectStreamObserver<String> streamObserver =
+        new DirectStreamObserver<>(
+            phaser,
+            TestStreams.withOnNext(
+                new Consumer<String>() {
+                  @Override
+                  public void accept(String t) {
+                    assertTrue(elementsAllowed.get());
+                  }
+                }).withIsReady(new Supplier<Boolean>() {
+              @Override
+              public Boolean get() {
+                return elementsAllowed.get();
+              }
+            }).build());
+
+    // Start all the tasks
+    List<Future<String>> results = new ArrayList<>();
+    for (final String prefix : ImmutableList.of("0", "1", "2", "3", "4")) {
+      results.add(
+          executor.submit(
+              new Callable<String>() {
+                @Override
+                public String call() throws Exception {
+                  for (int i = 0; i < 10; i++) {
+                    streamObserver.onNext(prefix + i);
+                  }
+                  return prefix;
+                }
+              }));
+    }
+
+    // Have them wait and then flip that we do allow elements and wake up those awaiting
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+    elementsAllowed.set(true);
+    phaser.arrive();
+
+    for (Future<String> result : results) {
+      result.get();
+    }
+    streamObserver.onCompleted();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java
new file mode 100644
index 0000000..a71841d
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/ForwardingClientResponseObserverTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.stream;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.stub.StreamObserver;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ForwardingClientResponseObserver}. */
+@RunWith(JUnit4.class)
+public class ForwardingClientResponseObserverTest {
+  @Test
+  public void testCallsAreForwardedAndOnReadyHandlerBound() {
+    @SuppressWarnings("unchecked")
+    StreamObserver<Object> delegateObserver = mock(StreamObserver.class);
+    @SuppressWarnings("unchecked")
+    ClientCallStreamObserver<Object> callStreamObserver =
+        mock(ClientCallStreamObserver.class);
+    Runnable onReadyHandler = new Runnable() {
+      @Override
+      public void run() {
+      }
+    };
+    ClientResponseObserver<Object, Object> observer =
+        new ForwardingClientResponseObserver<>(delegateObserver, onReadyHandler);
+    observer.onNext("A");
+    verify(delegateObserver).onNext("A");
+    Throwable t = new RuntimeException();
+    observer.onError(t);
+    verify(delegateObserver).onError(t);
+    observer.onCompleted();
+    verify(delegateObserver).onCompleted();
+    observer.beforeStart(callStreamObserver);
+    verify(callStreamObserver).setOnReadyHandler(onReadyHandler);
+    verifyNoMoreInteractions(delegateObserver, callStreamObserver);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java
new file mode 100644
index 0000000..184a6e2
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Consumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.test;
+
+/**
+ * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers.
+ */
+public interface Consumer<T> {
+  void accept(T item);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f9e2be91/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java
new file mode 100644
index 0000000..b0bae2e
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/Supplier.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.test;
+
+/**
+ * A fork of the Java 8 Supplier interface, to enable migrations.
+ */
+public interface Supplier<T> {
+  T get();
+}