You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/03 01:25:25 UTC

[1/4] beam git commit: This closes #3986

Repository: beam
Updated Branches:
  refs/heads/master fbcb0ba68 -> 4f4632ccc


This closes #3986


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f4632cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f4632cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f4632cc

Branch: refs/heads/master
Commit: 4f4632cccef6e1f0890bfc333534c186a2065aeb
Parents: fbcb0ba 927a8db
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 2 18:25:12 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 2 18:25:12 2017 -0700

----------------------------------------------------------------------
 ...job_beam_PostCommit_Java_MavenInstall.groovy |   2 +-
 .../job_beam_PreCommit_Java_MavenInstall.groovy |   4 +-
 pom.xml                                         |  67 +++++++-
 runners/java-fn-execution/pom.xml               | 105 ++++++++++++
 .../beam/runners/fnexecution/ServerFactory.java | 104 ++++++++++++
 .../beam/runners/fnexecution/package-info.java  |  23 +++
 .../runners/fnexecution/ServerFactoryTest.java  | 128 +++++++++++++++
 runners/pom.xml                                 |   1 +
 sdks/java/fn-execution/pom.xml                  |  96 +++++++++++
 .../harness/channel/ManagedChannelFactory.java  |  82 ++++++++++
 .../harness/channel/SocketAddressFactory.java   |  64 ++++++++
 .../beam/harness/channel/package-info.java      |  22 +++
 .../channel/ManagedChannelFactoryTest.java      |  71 ++++++++
 .../channel/SocketAddressFactoryTest.java       |  56 +++++++
 .../apache/beam/harness/test/TestExecutors.java |  85 ++++++++++
 .../beam/harness/test/TestExecutorsTest.java    | 160 ++++++++++++++++++
 .../apache/beam/harness/test/TestStreams.java   | 162 +++++++++++++++++++
 .../beam/harness/test/TestStreamsTest.java      |  84 ++++++++++
 sdks/java/harness/pom.xml                       |  26 +++
 .../org/apache/beam/fn/harness/FnHarness.java   |  19 ++-
 .../harness/channel/ManagedChannelFactory.java  |  86 ----------
 .../harness/channel/SocketAddressFactory.java   |  64 --------
 .../beam/fn/harness/channel/package-info.java   |  22 ---
 .../fn/harness/BeamFnDataReadRunnerTest.java    |   4 +-
 .../apache/beam/fn/harness/FnHarnessTest.java   |   2 +-
 .../channel/ManagedChannelFactoryTest.java      |  73 ---------
 .../channel/SocketAddressFactoryTest.java       |  56 -------
 .../control/BeamFnControlClientTest.java        |   2 +-
 .../fn/harness/control/RegisterHandlerTest.java |   4 +-
 ...BeamFnDataBufferingOutboundObserverTest.java |   2 +-
 .../harness/data/BeamFnDataGrpcClientTest.java  |   2 +-
 .../data/BeamFnDataGrpcMultiplexerTest.java     |   2 +-
 .../logging/BeamFnLoggingClientTest.java        |   2 +-
 .../state/BeamFnStateGrpcClientCacheTest.java   |   2 +-
 .../stream/BufferingStreamObserverTest.java     |   6 +-
 .../stream/DirectStreamObserverTest.java        |   6 +-
 .../beam/fn/harness/test/TestExecutors.java     |  85 ----------
 .../beam/fn/harness/test/TestExecutorsTest.java | 160 ------------------
 .../beam/fn/harness/test/TestStreams.java       | 162 -------------------
 .../beam/fn/harness/test/TestStreamsTest.java   |  84 ----------
 sdks/java/pom.xml                               |   1 +
 41 files changed, 1370 insertions(+), 818 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: Add a runners/java-fn-execution module

Posted by tg...@apache.org.
Add a runners/java-fn-execution module

This contains libraries for runner authors to create Fn API services and
RPCs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/927a8db1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/927a8db1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/927a8db1

Branch: refs/heads/master
Commit: 927a8db1397bc43c6cb253d6ca856afdbfa472a3
Parents: fdd5971
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 11 18:47:51 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 2 18:25:12 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   6 +
 runners/java-fn-execution/pom.xml               | 105 +++++++++++++++
 .../beam/runners/fnexecution/ServerFactory.java | 104 +++++++++++++++
 .../beam/runners/fnexecution/package-info.java  |  23 ++++
 .../runners/fnexecution/ServerFactoryTest.java  | 128 +++++++++++++++++++
 runners/pom.xml                                 |   1 +
 6 files changed, 367 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f1eee91..b2ab5d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -684,6 +684,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-java-fn-execution</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-runners-reference-job-orchestrator</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
new file mode 100644
index 0000000..bd4fcf0
--- /dev/null
+++ b/runners/java-fn-execution/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-runners-parent</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-java-fn-execution</artifactId>
+
+  <name>Apache Beam :: Runners :: Java Fn Execution</name>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <!--  Override Beam parent to allow Java8 -->
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-pipeline</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-fn-execution</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-fn-execution</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-fn-execution</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
new file mode 100644
index 0000000..918672a
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.net.HostAndPort;
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.beam.harness.channel.SocketAddressFactory;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+
+/**
+ * A {@link Server gRPC server} factory.
+ */
+public abstract class ServerFactory {
+  /**
+   * Create a default {@link ServerFactory}.
+   */
+  public static ServerFactory createDefault() {
+    return new InetSocketAddressServerFactory();
+  }
+
+  /**
+   * Creates an instance of this server using an ephemeral port chosen automatically. The chosen
+   * port is accessible to the caller from the URL set in the input {@link
+   * Endpoints.ApiServiceDescriptor.Builder}.
+   */
+  public abstract Server allocatePortAndCreate(
+      BindableService service, Endpoints.ApiServiceDescriptor.Builder builder) throws IOException;
+
+  /**
+   * Creates an instance of this server at the address specified by the given service descriptor.
+   */
+  public abstract Server create(
+      BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor) throws IOException;
+
+  /**
+   * Creates a {@link Server gRPC Server} using the default server factory.
+   *
+   * <p>The server is created listening any open port on "localhost".
+   */
+  public static class InetSocketAddressServerFactory extends ServerFactory {
+    private InetSocketAddressServerFactory() {}
+
+    @Override
+    public Server allocatePortAndCreate(
+        BindableService service, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+        throws IOException {
+      InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+      Server server = createServer(service, address);
+      apiServiceDescriptor.setUrl(
+          HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
+      return server;
+    }
+
+    @Override
+    public Server create(BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor)
+        throws IOException {
+      SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+      checkArgument(
+          socketAddress instanceof InetSocketAddress,
+          "%s %s requires a host:port socket address, got %s",
+          getClass().getSimpleName(), ServerFactory.class.getSimpleName(),
+          serviceDescriptor.getUrl());
+      return createServer(service, (InetSocketAddress) socketAddress);
+    }
+
+    private static Server createServer(BindableService service, InetSocketAddress socket)
+        throws IOException {
+      Server server =
+          NettyServerBuilder.forPort(socket.getPort())
+              .addService(service)
+              // Set the message size to max value here. The actual size is governed by the
+              // buffer size in the layers above.
+              .maxMessageSize(Integer.MAX_VALUE)
+              .build();
+      server.start();
+      return server;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
new file mode 100644
index 0000000..bc36f5e
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities used by runners to interact with the fn execution components of the Beam Portability
+ * Framework.
+ */
+package org.apache.beam.runners.fnexecution;

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
new file mode 100644
index 0000000..aa8d246
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.harness.channel.ManagedChannelFactory;
+import org.apache.beam.harness.test.TestStreams;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ServerFactory}.
+ */
+public class ServerFactoryTest {
+
+  private static final BeamFnApi.Elements CLIENT_DATA = BeamFnApi.Elements.newBuilder()
+      .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
+      .build();
+  private static final BeamFnApi.Elements SERVER_DATA = BeamFnApi.Elements.newBuilder()
+      .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
+      .build();
+
+  @Test
+  public void testCreatingDefaultServer() throws Exception {
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        runTestUsing(ServerFactory.createDefault(), ManagedChannelFactory.createDefault());
+    HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
+    assertThat(hostAndPort.getHost(), anyOf(
+        equalTo(InetAddress.getLoopbackAddress().getHostName()),
+        equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
+    assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
+  }
+
+  private Endpoints.ApiServiceDescriptor runTestUsing(
+      ServerFactory serverFactory, ManagedChannelFactory channelFactory) throws Exception {
+    Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
+        Endpoints.ApiServiceDescriptor.newBuilder();
+
+    Collection<Elements> serverElements = new ArrayList<>();
+    CountDownLatch clientHangedUp = new CountDownLatch(1);
+    CallStreamObserver<Elements> serverInboundObserver =
+        TestStreams.withOnNext(serverElements::add)
+        .withOnCompleted(clientHangedUp::countDown)
+        .build();
+    TestDataService service = new TestDataService(serverInboundObserver);
+    Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder);
+    assertFalse(server.isShutdown());
+
+    ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
+    BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
+    Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
+    CountDownLatch serverHangedUp = new CountDownLatch(1);
+    CallStreamObserver<BeamFnApi.Elements> clientInboundObserver =
+        TestStreams.withOnNext(clientElements::add)
+        .withOnCompleted(serverHangedUp::countDown)
+        .build();
+
+    StreamObserver<Elements> clientOutboundObserver = stub.data(clientInboundObserver);
+    StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take();
+
+    clientOutboundObserver.onNext(CLIENT_DATA);
+    serverOutboundObserver.onNext(SERVER_DATA);
+    clientOutboundObserver.onCompleted();
+    clientHangedUp.await();
+    serverOutboundObserver.onCompleted();
+    serverHangedUp.await();
+
+    assertThat(clientElements, contains(SERVER_DATA));
+    assertThat(serverElements, contains(CLIENT_DATA));
+
+    return apiServiceDescriptorBuilder.build();
+  }
+
+  /** A test gRPC service that uses the provided inbound observer for all clients. */
+  private static class TestDataService extends BeamFnDataGrpc.BeamFnDataImplBase {
+    private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers;
+    private final StreamObserver<BeamFnApi.Elements> inboundObserver;
+    private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver) {
+      this.inboundObserver = inboundObserver;
+      this.outboundObservers = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public StreamObserver<BeamFnApi.Elements> data(
+        StreamObserver<BeamFnApi.Elements> outboundObserver) {
+      Uninterruptibles.putUninterruptibly(outboundObservers, outboundObserver);
+      return inboundObserver;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/927a8db1/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 164d1b3..df3faa9 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -63,6 +63,7 @@
         <jdk>[1.8,)</jdk>
       </activation>
       <modules>
+        <module>java-fn-execution</module>
         <module>gearpump</module>
       </modules>
     </profile>


[4/4] beam git commit: Add sdks/java/fn-execution

Posted by tg...@apache.org.
Add sdks/java/fn-execution

This module contains java code shared by both the SDK harness and
Runners. It does not depend on the Java SDK. Runner harness
libraries will depend on this module, and are not permitted to interact
with SDK constructs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdd5971d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdd5971d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdd5971d

Branch: refs/heads/master
Commit: fdd5971d95df3219954023c6806f09dce87a1f8c
Parents: fbcb0ba
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 11 18:46:27 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 2 18:25:12 2017 -0700

----------------------------------------------------------------------
 ...job_beam_PostCommit_Java_MavenInstall.groovy |   2 +-
 .../job_beam_PreCommit_Java_MavenInstall.groovy |   4 +-
 pom.xml                                         |  61 ++++++-
 sdks/java/fn-execution/pom.xml                  |  96 +++++++++++
 .../harness/channel/ManagedChannelFactory.java  |  82 ++++++++++
 .../harness/channel/SocketAddressFactory.java   |  64 ++++++++
 .../beam/harness/channel/package-info.java      |  22 +++
 .../channel/ManagedChannelFactoryTest.java      |  71 ++++++++
 .../channel/SocketAddressFactoryTest.java       |  56 +++++++
 .../apache/beam/harness/test/TestExecutors.java |  85 ++++++++++
 .../beam/harness/test/TestExecutorsTest.java    | 160 ++++++++++++++++++
 .../apache/beam/harness/test/TestStreams.java   | 162 +++++++++++++++++++
 .../beam/harness/test/TestStreamsTest.java      |  84 ++++++++++
 sdks/java/harness/pom.xml                       |  26 +++
 .../org/apache/beam/fn/harness/FnHarness.java   |  19 ++-
 .../harness/channel/ManagedChannelFactory.java  |  86 ----------
 .../harness/channel/SocketAddressFactory.java   |  64 --------
 .../beam/fn/harness/channel/package-info.java   |  22 ---
 .../fn/harness/BeamFnDataReadRunnerTest.java    |   4 +-
 .../apache/beam/fn/harness/FnHarnessTest.java   |   2 +-
 .../channel/ManagedChannelFactoryTest.java      |  73 ---------
 .../channel/SocketAddressFactoryTest.java       |  56 -------
 .../control/BeamFnControlClientTest.java        |   2 +-
 .../fn/harness/control/RegisterHandlerTest.java |   4 +-
 ...BeamFnDataBufferingOutboundObserverTest.java |   2 +-
 .../harness/data/BeamFnDataGrpcClientTest.java  |   2 +-
 .../data/BeamFnDataGrpcMultiplexerTest.java     |   2 +-
 .../logging/BeamFnLoggingClientTest.java        |   2 +-
 .../state/BeamFnStateGrpcClientCacheTest.java   |   2 +-
 .../stream/BufferingStreamObserverTest.java     |   6 +-
 .../stream/DirectStreamObserverTest.java        |   6 +-
 .../beam/fn/harness/test/TestExecutors.java     |  85 ----------
 .../beam/fn/harness/test/TestExecutorsTest.java | 160 ------------------
 .../beam/fn/harness/test/TestStreams.java       | 162 -------------------
 .../beam/fn/harness/test/TestStreamsTest.java   |  84 ----------
 sdks/java/pom.xml                               |   1 +
 36 files changed, 1003 insertions(+), 818 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
index 5d67e6d..0dda772 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
@@ -47,7 +47,7 @@ mavenJob('beam_PostCommit_Java_MavenInstall') {
   goals([
       'clean',
       'install',
-      '--projects sdks/java/core,runners/direct-java',
+      '--projects sdks/java/core,runners/direct-java,sdks/java/fn-execution',
       ' --also-make',
       '--also-make-dependents',
       '--batch-mode',

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/.test-infra/jenkins/job_beam_PreCommit_Java_MavenInstall.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PreCommit_Java_MavenInstall.groovy b/.test-infra/jenkins/job_beam_PreCommit_Java_MavenInstall.groovy
index 52423e0..0775e2f 100644
--- a/.test-infra/jenkins/job_beam_PreCommit_Java_MavenInstall.groovy
+++ b/.test-infra/jenkins/job_beam_PreCommit_Java_MavenInstall.groovy
@@ -38,14 +38,14 @@ mavenJob('beam_PreCommit_Java_MavenInstall') {
   common_job_properties.setMavenConfig(delegate)
 
   // Sets that this is a PreCommit job.
-  common_job_properties.setPreCommit(delegate, 'mvn clean install -pl sdks/java/core,runners/direct-java -am -amd', 'Run Java PreCommit')
+  common_job_properties.setPreCommit(delegate, 'mvn clean install -pl sdks/java/core,runners/direct-java,sdks/java/fn-execution -am -amd', 'Run Java PreCommit')
 
   // Maven goals for this job: The Java SDK, its dependencies, and things that depend on it.
   goals([
     '--batch-mode',
     '--errors',
     '--activate-profiles release,jenkins-precommit,direct-runner,dataflow-runner,spark-runner,flink-runner,apex-runner',
-    '--projects sdks/java/core,runners/direct-java',
+    '--projects sdks/java/core,runners/direct-java,sdks/java/fn-execution',
     '--also-make',
     '--also-make-dependents',
     '-D pullRequest=$ghprbPullId',

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fcd0339..f1eee91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -289,6 +289,52 @@
     </profile>
 
     <profile>
+      <id>java8-enable-like-dependencies</id>
+      <activation>
+        <jdk>[1.8,)</jdk>
+      </activation>
+      <build>
+        <plugins>
+          <!-- Override Beam parent to allow Java8 dependencies -->
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <version>${maven-enforcer-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>enforce</id>
+                <goals>
+                  <goal>enforce</goal>
+                </goals>
+                <configuration>
+                  <rules>
+                    <enforceBytecodeVersion>
+                      <maxJdkVersion>1.8</maxJdkVersion>
+                      <excludes>
+                        <!--
+                          Supplied by the user JDK and compiled with matching
+                          version. Is not shaded, so safe to ignore.
+                        -->
+                        <exclude>jdk.tools:jdk.tools</exclude>
+                      </excludes>
+                    </enforceBytecodeVersion>
+                    <requireJavaVersion>
+                      <version>[1.7,)</version>
+                    </requireJavaVersion>
+                    <requireMavenVersion>
+                      <!-- Keep aligned with preqrequisite section below. -->
+                      <version>[3.2,)</version>
+                    </requireMavenVersion>
+                  </rules>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <profile>
       <id>doclint-java8-disable</id>
       <activation>
         <jdk>[1.8,)</jdk>
@@ -462,6 +508,19 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-fn-execution</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-fn-execution</artifactId>
+        <version>${project.version}</version>
+        <type>test-jar</type>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-harness</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -601,7 +660,7 @@
       <dependency>
         <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
-	    <version>${project.version}</version>
+        <version>${project.version}</version>
       </dependency>
 
       <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml
new file mode 100644
index 0000000..9929c29
--- /dev/null
+++ b/sdks/java/fn-execution/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-parent</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-fn-execution</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Harness Core</name>
+  <description>Contains code shared across the Beam Java SDK Harness and the Java Runner Harness
+    libraries.
+  </description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <!--  Override Beam parent to allow Java8 -->
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-pipeline</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-epoll</artifactId>
+      <classifier>linux-x86_64</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java
new file mode 100644
index 0000000..187cfdb
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/ManagedChannelFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.channel;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.channel.epoll.EpollDomainSocketChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.net.SocketAddress;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+
+/**
+ * A Factory which creates an underlying {@link ManagedChannel} implementation.
+ */
+public abstract class ManagedChannelFactory {
+  public static ManagedChannelFactory createDefault() {
+    return new Default();
+  }
+
+  public static ManagedChannelFactory createEpoll() {
+    io.netty.channel.epoll.Epoll.ensureAvailability();
+    return new Epoll();
+  }
+
+  public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor);
+
+  /**
+   * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address
+   * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an
+   * {@link EpollSocketChannel}.
+   */
+  private static class Epoll extends ManagedChannelFactory {
+    @Override
+    public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
+      SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
+      return NettyChannelBuilder.forAddress(address)
+          .channelType(address instanceof DomainSocketAddress
+              ? EpollDomainSocketChannel.class : EpollSocketChannel.class)
+          .eventLoopGroup(new EpollEventLoopGroup())
+          .usePlaintext(true)
+          // Set the message size to max value here. The actual size is governed by the
+          // buffer size in the layers above.
+          .maxInboundMessageSize(Integer.MAX_VALUE)
+          .build();
+    }
+  }
+
+  /**
+   * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create
+   * instances.
+   */
+  private static class Default extends ManagedChannelFactory {
+    @Override
+    public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
+      return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
+          .usePlaintext(true)
+          // Set the message size to max value here. The actual size is governed by the
+          // buffer size in the layers above.
+          .maxInboundMessageSize(Integer.MAX_VALUE)
+          .build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java
new file mode 100644
index 0000000..5253291
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/SocketAddressFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.channel;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.net.HostAndPort;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/** Creates a {@link SocketAddress} based upon a supplied string. */
+public class SocketAddressFactory {
+  private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
+
+  /**
+   * Parse a {@link SocketAddress} from the given string.
+   */
+  public static SocketAddress createFrom(String value) {
+    if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
+      // Unix Domain Socket address.
+      // Create the underlying file for the Unix Domain Socket.
+      String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
+      File file = new File(filePath);
+      if (!file.isAbsolute()) {
+        throw new IllegalArgumentException("File path must be absolute: " + filePath);
+      }
+      try {
+        if (file.createNewFile()) {
+          // If this application created the file, delete it when the application exits.
+          file.deleteOnExit();
+        }
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+      // Create the SocketAddress referencing the file.
+      return new DomainSocketAddress(file);
+    } else {
+      // Standard TCP/IP address.
+      HostAndPort hostAndPort = HostAndPort.fromString(value);
+      checkArgument(hostAndPort.hasPort(),
+          "Address must be a unix:// path or be in the form host:port. Got: %s", value);
+      return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java
new file mode 100644
index 0000000..2a33445
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/harness/channel/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * gRPC channel management.
+ */
+package org.apache.beam.harness.channel;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java
new file mode 100644
index 0000000..f73ed80
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/ManagedChannelFactoryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.channel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import io.grpc.ManagedChannel;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+/** Tests for {@link ManagedChannelFactory}. */
+@RunWith(JUnit4.class)
+public class ManagedChannelFactoryTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testDefaultChannel() {
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build();
+    ManagedChannel channel =
+        ManagedChannelFactory.createDefault().forDescriptor(apiServiceDescriptor);
+    assertEquals("localhost:123", channel.authority());
+    channel.shutdownNow();
+  }
+
+  @Test
+  public void testEpollHostPortChannel() {
+    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build();
+    ManagedChannel channel =
+        ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor);
+    assertEquals("localhost:123", channel.authority());
+    channel.shutdownNow();
+  }
+
+  @Test
+  public void testEpollDomainSocketChannel() throws Exception {
+    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl("unix://" + tmpFolder.newFile().getAbsolutePath())
+            .build();
+    ManagedChannel channel =
+        ManagedChannelFactory.createEpoll().forDescriptor(apiServiceDescriptor);
+    assertEquals(apiServiceDescriptor.getUrl().substring("unix://".length()), channel.authority());
+    channel.shutdownNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java
new file mode 100644
index 0000000..95a7d67
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/channel/SocketAddressFactoryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.channel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import io.netty.channel.unix.DomainSocketAddress;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SocketAddressFactory}. */
+@RunWith(JUnit4.class)
+public class SocketAddressFactoryTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testHostPortSocket() {
+    SocketAddress socketAddress = SocketAddressFactory.createFrom("localhost:123");
+    assertThat(socketAddress, Matchers.instanceOf(InetSocketAddress.class));
+    assertEquals("localhost", ((InetSocketAddress) socketAddress).getHostString());
+    assertEquals(123, ((InetSocketAddress) socketAddress).getPort());
+  }
+
+  @Test
+  public void testDomainSocket() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    SocketAddress socketAddress = SocketAddressFactory.createFrom(
+        "unix://" + tmpFile.getAbsolutePath());
+    assertThat(socketAddress, Matchers.instanceOf(DomainSocketAddress.class));
+    assertEquals(tmpFile.getAbsolutePath(), ((DomainSocketAddress) socketAddress).path());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
new file mode 100644
index 0000000..d818a61
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.test;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * A {@link TestRule} that validates that all submitted tasks finished and were completed. This
+ * allows for testing that tasks have exercised the appropriate shutdown logic.
+ */
+public class TestExecutors {
+  public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) {
+    return new FromSupplier(executorServiceSuppler);
+  }
+
+  /** A union of the {@link ExecutorService} and {@link TestRule} interfaces. */
+  public interface TestExecutorService extends ExecutorService, TestRule {}
+
+  private static class FromSupplier extends ForwardingExecutorService
+      implements TestExecutorService {
+    private final Supplier<ExecutorService> executorServiceSupplier;
+    private ExecutorService delegate;
+
+    private FromSupplier(Supplier<ExecutorService> executorServiceSupplier) {
+      this.executorServiceSupplier = executorServiceSupplier;
+    }
+
+    @Override
+    public Statement apply(Statement statement, Description arg1) {
+      return new Statement() {
+        @Override
+        public void evaluate() throws Throwable {
+          Throwable thrown = null;
+          delegate = executorServiceSupplier.get();
+          try {
+            statement.evaluate();
+          } catch (Throwable t) {
+            thrown = t;
+          }
+          shutdown();
+          if (!awaitTermination(5, TimeUnit.SECONDS)) {
+            shutdownNow();
+            IllegalStateException e =
+                new IllegalStateException("Test executor failed to shutdown cleanly.");
+            if (thrown != null) {
+              thrown.addSuppressed(e);
+            } else {
+              thrown = e;
+            }
+          }
+          if (thrown != null) {
+            throw thrown;
+          }
+        }
+      };
+    }
+
+    @Override
+    protected ExecutorService delegate() {
+      return delegate;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
new file mode 100644
index 0000000..1381b55
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.model.Statement;
+
+/** Tests for {@link TestExecutors}. */
+@RunWith(JUnit4.class)
+public class TestExecutorsTest {
+  @Test
+  public void testSuccessfulTermination() throws Throwable {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    final TestExecutorService testService = TestExecutors.from(() -> service);
+    final AtomicBoolean taskRan = new AtomicBoolean();
+    testService
+        .apply(
+            new Statement() {
+              @Override
+              public void evaluate() throws Throwable {
+                testService.submit(() -> taskRan.set(true));
+              }
+            },
+            null)
+        .evaluate();
+    assertTrue(service.isTerminated());
+    assertTrue(taskRan.get());
+  }
+
+  @Test
+  public void testTaskBlocksForeverCausesFailure() throws Throwable {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    final TestExecutorService testService = TestExecutors.from(() -> service);
+    final AtomicBoolean taskStarted = new AtomicBoolean();
+    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
+    try {
+      testService
+          .apply(
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  testService.submit(this::taskToRun);
+                }
+
+                private void taskToRun() {
+                  taskStarted.set(true);
+                  try {
+                    while (true) {
+                      Thread.sleep(10000);
+                    }
+                  } catch (InterruptedException e) {
+                    taskWasInterrupted.set(true);
+                    return;
+                  }
+                }
+              },
+              null)
+          .evaluate();
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals(IllegalStateException.class, e.getClass());
+      assertEquals("Test executor failed to shutdown cleanly.", e.getMessage());
+    }
+    assertTrue(service.isShutdown());
+  }
+
+  @Test
+  public void testStatementFailurePropagatedCleanly() throws Throwable {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    final TestExecutorService testService = TestExecutors.from(() -> service);
+    final RuntimeException exceptionToThrow = new RuntimeException();
+    try {
+      testService
+          .apply(
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  throw exceptionToThrow;
+                }
+              },
+              null)
+          .evaluate();
+      fail();
+    } catch (RuntimeException thrownException) {
+      assertSame(exceptionToThrow, thrownException);
+    }
+    assertTrue(service.isShutdown());
+  }
+
+  @Test
+  public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
+      throws Throwable {
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    final TestExecutorService testService = TestExecutors.from(() -> service);
+    final AtomicBoolean taskStarted = new AtomicBoolean();
+    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
+    final RuntimeException exceptionToThrow = new RuntimeException();
+    try {
+      testService
+          .apply(
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  testService.submit(this::taskToRun);
+                  throw exceptionToThrow;
+                }
+
+                private void taskToRun() {
+                  taskStarted.set(true);
+                  try {
+                    while (true) {
+                      Thread.sleep(10000);
+                    }
+                  } catch (InterruptedException e) {
+                    taskWasInterrupted.set(true);
+                    return;
+                  }
+                }
+              },
+              null)
+          .evaluate();
+      fail();
+    } catch (RuntimeException thrownException) {
+      assertSame(exceptionToThrow, thrownException);
+      assertEquals(1, exceptionToThrow.getSuppressed().length);
+      assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass());
+      assertEquals(
+          "Test executor failed to shutdown cleanly.",
+          exceptionToThrow.getSuppressed()[0].getMessage());
+    }
+    assertTrue(service.isShutdown());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
new file mode 100644
index 0000000..a7b362d
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.test;
+
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Utility methods which enable testing of {@link StreamObserver}s. */
+public class TestStreams {
+  /**
+   * Creates a test {@link CallStreamObserver}  {@link Builder} that forwards
+   * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}.
+   */
+  public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
+    return new Builder<>(new ForwardingCallStreamObserver<>(
+        onNext,
+        TestStreams::noop,
+        TestStreams::noop,
+        TestStreams::returnTrue));
+  }
+
+  /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
+  public static class Builder<T> {
+    private final ForwardingCallStreamObserver<T> observer;
+    private Builder(ForwardingCallStreamObserver<T> observer) {
+      this.observer = observer;
+    }
+
+    /**
+     * Returns a new {@link Builder} like this one with the specified
+     * {@link CallStreamObserver#isReady} callback.
+     */
+    public Builder<T> withIsReady(Supplier<Boolean> isReady) {
+      return new Builder<>(new ForwardingCallStreamObserver<>(
+          observer.onNext,
+          observer.onError,
+          observer.onCompleted,
+          isReady));
+    }
+
+    /**
+     * Returns a new {@link Builder} like this one with the specified
+     * {@link StreamObserver#onCompleted} callback.
+     */
+    public Builder<T> withOnCompleted(Runnable onCompleted) {
+      return new Builder<>(new ForwardingCallStreamObserver<>(
+          observer.onNext,
+          observer.onError,
+          onCompleted,
+          observer.isReady));
+    }
+
+    /**
+     * Returns a new {@link Builder} like this one with the specified
+     * {@link StreamObserver#onError} callback.
+     */
+    public Builder<T> withOnError(Runnable onError) {
+      return new Builder<>(new ForwardingCallStreamObserver<>(
+          observer.onNext,
+          new Consumer<Throwable>() {
+            @Override
+            public void accept(Throwable t) {
+              onError.run();
+            }
+          },
+          observer.onCompleted,
+          observer.isReady));
+    }
+
+    /**
+     * Returns a new {@link Builder} like this one with the specified
+     * {@link StreamObserver#onError} consumer.
+     */
+    public Builder<T> withOnError(Consumer<Throwable> onError) {
+      return new Builder<>(new ForwardingCallStreamObserver<>(
+          observer.onNext, onError, observer.onCompleted, observer.isReady));
+    }
+
+    public CallStreamObserver<T> build() {
+      return observer;
+    }
+  }
+
+  private static void noop() {
+  }
+
+  private static void noop(Throwable t) {
+  }
+
+  private static boolean returnTrue() {
+    return true;
+  }
+
+  /** A {@link CallStreamObserver} which executes the supplied callbacks. */
+  private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
+    private final Consumer<T> onNext;
+    private final Supplier<Boolean> isReady;
+    private final Consumer<Throwable> onError;
+    private final Runnable onCompleted;
+
+    public ForwardingCallStreamObserver(
+        Consumer<T> onNext,
+        Consumer<Throwable> onError,
+        Runnable onCompleted,
+        Supplier<Boolean> isReady) {
+      this.onNext = onNext;
+      this.onError = onError;
+      this.onCompleted = onCompleted;
+      this.isReady = isReady;
+    }
+
+    @Override
+    public void onNext(T value) {
+      onNext.accept(value);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      onError.accept(t);
+    }
+
+    @Override
+    public void onCompleted() {
+      onCompleted.run();
+    }
+
+    @Override
+    public boolean isReady() {
+      return isReady.get();
+    }
+
+    @Override
+    public void setOnReadyHandler(Runnable onReadyHandler) {}
+
+    @Override
+    public void disableAutoInboundFlowControl() {}
+
+    @Override
+    public void request(int count) {}
+
+    @Override
+    public void setMessageCompression(boolean enable) {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
new file mode 100644
index 0000000..f5741ae
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.harness.test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link TestStreams}. */
+@RunWith(JUnit4.class)
+public class TestStreamsTest {
+  @Test
+  public void testOnNextIsCalled() {
+    AtomicBoolean onNextWasCalled = new AtomicBoolean();
+    TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true);
+    assertTrue(onNextWasCalled.get());
+  }
+
+  @Test
+  public void testIsReadyIsCalled() {
+    final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
+    assertFalse(TestStreams.withOnNext(null)
+        .withIsReady(() -> isReadyWasCalled.getAndSet(true))
+        .build()
+        .isReady());
+    assertTrue(isReadyWasCalled.get());
+  }
+
+  @Test
+  public void testOnCompletedIsCalled() {
+    AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
+    TestStreams.withOnNext(null)
+        .withOnCompleted(() -> onCompletedWasCalled.set(true))
+        .build()
+        .onCompleted();
+    assertTrue(onCompletedWasCalled.get());
+  }
+
+  @Test
+  public void testOnErrorRunnableIsCalled() {
+    RuntimeException throwable = new RuntimeException();
+    AtomicBoolean onErrorWasCalled = new AtomicBoolean();
+    TestStreams.withOnNext(null)
+        .withOnError(() -> onErrorWasCalled.set(true))
+        .build()
+        .onError(throwable);
+    assertTrue(onErrorWasCalled.get());
+  }
+
+  @Test
+  public void testOnErrorConsumerIsCalled() {
+    RuntimeException throwable = new RuntimeException();
+    Collection<Throwable> onErrorWasCalled = new ArrayList<>();
+    TestStreams.withOnNext(null)
+        .withOnError(onErrorWasCalled::add)
+        .build()
+        .onError(throwable);
+    assertThat(onErrorWasCalled, contains(throwable));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index e60c8a4..3707730 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -36,6 +36,17 @@
       <plugins>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-dependency-plugin</artifactId>
+          <configuration>
+            <ignoredUnusedDeclaredDependencies>
+              <ignoredUnusedDeclaredDependency>
+                com.google.protobuf:protobuf-java
+              </ignoredUnusedDeclaredDependency>
+            </ignoredUnusedDeclaredDependencies>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-shade-plugin</artifactId>
           <executions>
             <execution>
@@ -139,6 +150,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-fn-execution</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
@@ -155,6 +171,14 @@
       <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
     </dependency>
 
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-fn-execution</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-core-java</artifactId>
@@ -205,6 +229,7 @@
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-netty</artifactId>
+      <scope>runtime</scope>
     </dependency>
 
     <dependency>
@@ -216,6 +241,7 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-transport-native-epoll</artifactId>
       <classifier>linux-x86_64</classifier>
+      <scope>runtime</scope>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 7d78856..e1790fa 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -21,7 +21,7 @@ package org.apache.beam.fn.harness;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.TextFormat;
 import java.util.EnumMap;
-import org.apache.beam.fn.harness.channel.ManagedChannelFactory;
+import java.util.List;
 import org.apache.beam.fn.harness.control.BeamFnControlClient;
 import org.apache.beam.fn.harness.control.ProcessBundleHandler;
 import org.apache.beam.fn.harness.control.RegisterHandler;
@@ -30,9 +30,13 @@ import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.harness.stream.StreamObserverFactory;
+import org.apache.beam.harness.channel.ManagedChannelFactory;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builder;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.slf4j.Logger;
@@ -90,7 +94,13 @@ public class FnHarness {
   public static void main(PipelineOptions options,
       Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
       Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
-    ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
+    ManagedChannelFactory channelFactory;
+    List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
+    if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
+      channelFactory = ManagedChannelFactory.createEpoll();
+    } else {
+      channelFactory = ManagedChannelFactory.createDefault();
+    }
     StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options);
     try (BeamFnLoggingClient logging = new BeamFnLoggingClient(
         options,
@@ -99,9 +109,8 @@ public class FnHarness {
 
       LOG.info("Fn Harness started");
       EnumMap<BeamFnApi.InstructionRequest.RequestCase,
-              ThrowingFunction<BeamFnApi.InstructionRequest,
-                               BeamFnApi.InstructionResponse.Builder>> handlers =
-          new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);
+              ThrowingFunction<InstructionRequest, Builder>>
+          handlers = new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);
 
       RegisterHandler fnApiRegistry = new RegisterHandler();
       BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
deleted file mode 100644
index 0c615a9..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.channel;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.NettyChannelBuilder;
-import io.netty.channel.epoll.EpollDomainSocketChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.unix.DomainSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.sdk.options.ExperimentalOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Uses {@link PipelineOptions} to configure which underlying {@link ManagedChannel} implementation
- * to use.
- */
-public abstract class ManagedChannelFactory {
-  public static ManagedChannelFactory from(PipelineOptions options) {
-    List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
-    if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
-      io.netty.channel.epoll.Epoll.ensureAvailability();
-      return new Epoll();
-    }
-    return new Default();
-  }
-
-  public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor);
-
-  /**
-   * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address
-   * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an
-   * {@link EpollSocketChannel}.
-   */
-  private static class Epoll extends ManagedChannelFactory {
-    @Override
-    public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
-      SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
-      return NettyChannelBuilder.forAddress(address)
-          .channelType(address instanceof DomainSocketAddress
-              ? EpollDomainSocketChannel.class : EpollSocketChannel.class)
-          .eventLoopGroup(new EpollEventLoopGroup())
-          .usePlaintext(true)
-          // Set the message size to max value here. The actual size is governed by the
-          // buffer size in the layers above.
-          .maxInboundMessageSize(Integer.MAX_VALUE)
-          .build();
-    }
-  }
-
-  /**
-   * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create
-   * instances.
-   */
-  private static class Default extends ManagedChannelFactory {
-    @Override
-    public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
-      return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
-          .usePlaintext(true)
-          // Set the message size to max value here. The actual size is governed by the
-          // buffer size in the layers above.
-          .maxInboundMessageSize(Integer.MAX_VALUE)
-          .build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
deleted file mode 100644
index a27d542..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.channel;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.net.HostAndPort;
-import io.netty.channel.unix.DomainSocketAddress;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-/** Creates a {@link SocketAddress} based upon a supplied string. */
-public class SocketAddressFactory {
-  private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
-
-  /**
-   * Parse a {@link SocketAddress} from the given string.
-   */
-  public static SocketAddress createFrom(String value) {
-    if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
-      // Unix Domain Socket address.
-      // Create the underlying file for the Unix Domain Socket.
-      String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
-      File file = new File(filePath);
-      if (!file.isAbsolute()) {
-        throw new IllegalArgumentException("File path must be absolute: " + filePath);
-      }
-      try {
-        if (file.createNewFile()) {
-          // If this application created the file, delete it when the application exits.
-          file.deleteOnExit();
-        }
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-      // Create the SocketAddress referencing the file.
-      return new DomainSocketAddress(file);
-    } else {
-      // Standard TCP/IP address.
-      HostAndPort hostAndPort = HostAndPort.fromString(value);
-      checkArgument(hostAndPort.hasPort(),
-          "Address must be a unix:// path or be in the form host:port. Got: %s", value);
-      return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
deleted file mode 100644
index 6323166..0000000
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * gRPC channel management.
- */
-package org.apache.beam.fn.harness.channel;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index de68d41..f00346d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -49,8 +49,8 @@ import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.fn.harness.test.TestExecutors;
-import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
+import org.apache.beam.harness.test.TestExecutors;
+import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.model.pipeline.v1.RunnerApi;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
index fc89acf..66c31a8 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -29,7 +29,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
deleted file mode 100644
index 6f27e21..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/ManagedChannelFactoryTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.channel;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
-
-import io.grpc.ManagedChannel;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link ManagedChannelFactory}. */
-@RunWith(JUnit4.class)
-public class ManagedChannelFactoryTest {
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testDefaultChannel() {
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build();
-    ManagedChannel channel = ManagedChannelFactory.from(PipelineOptionsFactory.create())
-        .forDescriptor(apiServiceDescriptor);
-    assertEquals("localhost:123", channel.authority());
-    channel.shutdownNow();
-  }
-
-  @Test
-  public void testEpollHostPortChannel() {
-    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:123").build();
-    ManagedChannel channel = ManagedChannelFactory.from(
-        PipelineOptionsFactory.fromArgs(new String[]{ "--experiments=beam_fn_api_epoll" }).create())
-        .forDescriptor(apiServiceDescriptor);
-    assertEquals("localhost:123", channel.authority());
-    channel.shutdownNow();
-  }
-
-  @Test
-  public void testEpollDomainSocketChannel() throws Exception {
-    assumeTrue(io.netty.channel.epoll.Epoll.isAvailable());
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        Endpoints.ApiServiceDescriptor.newBuilder()
-            .setUrl("unix://" + tmpFolder.newFile().getAbsolutePath())
-            .build();
-    ManagedChannel channel = ManagedChannelFactory.from(
-        PipelineOptionsFactory.fromArgs(new String[]{ "--experiments=beam_fn_api_epoll" }).create())
-        .forDescriptor(apiServiceDescriptor);
-    assertEquals(apiServiceDescriptor.getUrl().substring("unix://".length()), channel.authority());
-    channel.shutdownNow();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/SocketAddressFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/SocketAddressFactoryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/SocketAddressFactoryTest.java
deleted file mode 100644
index 610a8ea..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/channel/SocketAddressFactoryTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.channel;
-
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import io.netty.channel.unix.DomainSocketAddress;
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link SocketAddressFactory}. */
-@RunWith(JUnit4.class)
-public class SocketAddressFactoryTest {
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testHostPortSocket() {
-    SocketAddress socketAddress = SocketAddressFactory.createFrom("localhost:123");
-    assertThat(socketAddress, instanceOf(InetSocketAddress.class));
-    assertEquals("localhost", ((InetSocketAddress) socketAddress).getHostString());
-    assertEquals(123, ((InetSocketAddress) socketAddress).getPort());
-  }
-
-  @Test
-  public void testDomainSocket() throws Exception {
-    File tmpFile = tmpFolder.newFile();
-    SocketAddress socketAddress = SocketAddressFactory.createFrom(
-        "unix://" + tmpFile.getAbsolutePath());
-    assertThat(socketAddress, instanceOf(DomainSocketAddress.class));
-    assertEquals(tmpFile.getAbsolutePath(), ((DomainSocketAddress) socketAddress).path());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index 8dc62b3..56ae7ed 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -39,7 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.fn.ThrowingFunction;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
index 40b2145..aa1a504 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertEquals;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import org.apache.beam.fn.harness.test.TestExecutors;
-import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
+import org.apache.beam.harness.test.TestExecutors;
+import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RegisterResponse;
 import org.apache.beam.model.pipeline.v1.RunnerApi;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
index 4898b90..81b1aa4 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -29,7 +29,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index 5e2545d..7df8925 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -45,7 +45,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
index c63dd62..6a12ed0 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
@@ -30,7 +30,7 @@ import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index c9057ea..1e68b18 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.LogManager;
 import java.util.logging.LogRecord;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
index e8c616d..12c9c43 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.IdGenerator;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestStreams;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
index b26e8e1..3f66c4c 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java
@@ -32,9 +32,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
-import org.apache.beam.fn.harness.test.TestExecutors;
-import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestExecutors;
+import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
+import org.apache.beam.harness.test.TestStreams;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
index b5d3ec1..120a73d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java
@@ -32,9 +32,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
-import org.apache.beam.fn.harness.test.TestExecutors;
-import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
-import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.harness.test.TestExecutors;
+import org.apache.beam.harness.test.TestExecutors.TestExecutorService;
+import org.apache.beam.harness.test.TestStreams;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutors.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutors.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutors.java
deleted file mode 100644
index f846466..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutors.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import com.google.common.util.concurrent.ForwardingExecutorService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/**
- * A {@link TestRule} that validates that all submitted tasks finished and were completed. This
- * allows for testing that tasks have exercised the appropriate shutdown logic.
- */
-public class TestExecutors {
-  public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) {
-    return new FromSupplier(executorServiceSuppler);
-  }
-
-  /** A union of the {@link ExecutorService} and {@link TestRule} interfaces. */
-  public interface TestExecutorService extends ExecutorService, TestRule {}
-
-  private static class FromSupplier extends ForwardingExecutorService
-      implements TestExecutorService {
-    private final Supplier<ExecutorService> executorServiceSupplier;
-    private ExecutorService delegate;
-
-    private FromSupplier(Supplier<ExecutorService> executorServiceSupplier) {
-      this.executorServiceSupplier = executorServiceSupplier;
-    }
-
-    @Override
-    public Statement apply(Statement statement, Description arg1) {
-      return new Statement() {
-        @Override
-        public void evaluate() throws Throwable {
-          Throwable thrown = null;
-          delegate = executorServiceSupplier.get();
-          try {
-            statement.evaluate();
-          } catch (Throwable t) {
-            thrown = t;
-          }
-          shutdown();
-          if (!awaitTermination(5, TimeUnit.SECONDS)) {
-            shutdownNow();
-            IllegalStateException e =
-                new IllegalStateException("Test executor failed to shutdown cleanly.");
-            if (thrown != null) {
-              thrown.addSuppressed(e);
-            } else {
-              thrown = e;
-            }
-          }
-          if (thrown != null) {
-            throw thrown;
-          }
-        }
-      };
-    }
-
-    @Override
-    protected ExecutorService delegate() {
-      return delegate;
-    }
-  }
-}


[3/4] beam git commit: Add sdks/java/fn-execution

Posted by tg...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
deleted file mode 100644
index 85c64d0..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.model.Statement;
-
-/** Tests for {@link TestExecutors}. */
-@RunWith(JUnit4.class)
-public class TestExecutorsTest {
-  @Test
-  public void testSuccessfulTermination() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
-    final AtomicBoolean taskRan = new AtomicBoolean();
-    testService
-        .apply(
-            new Statement() {
-              @Override
-              public void evaluate() throws Throwable {
-                testService.submit(() -> taskRan.set(true));
-              }
-            },
-            null)
-        .evaluate();
-    assertTrue(service.isTerminated());
-    assertTrue(taskRan.get());
-  }
-
-  @Test
-  public void testTaskBlocksForeverCausesFailure() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
-    final AtomicBoolean taskStarted = new AtomicBoolean();
-    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  testService.submit(this::taskToRun);
-                }
-
-                private void taskToRun() {
-                  taskStarted.set(true);
-                  try {
-                    while (true) {
-                      Thread.sleep(10000);
-                    }
-                  } catch (InterruptedException e) {
-                    taskWasInterrupted.set(true);
-                    return;
-                  }
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (IllegalStateException e) {
-      assertEquals(IllegalStateException.class, e.getClass());
-      assertEquals("Test executor failed to shutdown cleanly.", e.getMessage());
-    }
-    assertTrue(service.isShutdown());
-  }
-
-  @Test
-  public void testStatementFailurePropagatedCleanly() throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
-    final RuntimeException exceptionToThrow = new RuntimeException();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  throw exceptionToThrow;
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (RuntimeException thrownException) {
-      assertSame(exceptionToThrow, thrownException);
-    }
-    assertTrue(service.isShutdown());
-  }
-
-  @Test
-  public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate()
-      throws Throwable {
-    ExecutorService service = Executors.newSingleThreadExecutor();
-    final TestExecutorService testService = TestExecutors.from(() -> service);
-    final AtomicBoolean taskStarted = new AtomicBoolean();
-    final AtomicBoolean taskWasInterrupted = new AtomicBoolean();
-    final RuntimeException exceptionToThrow = new RuntimeException();
-    try {
-      testService
-          .apply(
-              new Statement() {
-                @Override
-                public void evaluate() throws Throwable {
-                  testService.submit(this::taskToRun);
-                  throw exceptionToThrow;
-                }
-
-                private void taskToRun() {
-                  taskStarted.set(true);
-                  try {
-                    while (true) {
-                      Thread.sleep(10000);
-                    }
-                  } catch (InterruptedException e) {
-                    taskWasInterrupted.set(true);
-                    return;
-                  }
-                }
-              },
-              null)
-          .evaluate();
-      fail();
-    } catch (RuntimeException thrownException) {
-      assertSame(exceptionToThrow, thrownException);
-      assertEquals(1, exceptionToThrow.getSuppressed().length);
-      assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass());
-      assertEquals(
-          "Test executor failed to shutdown cleanly.",
-          exceptionToThrow.getSuppressed()[0].getMessage());
-    }
-    assertTrue(service.isShutdown());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
deleted file mode 100644
index f398286..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.StreamObserver;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/** Utility methods which enable testing of {@link StreamObserver}s. */
-public class TestStreams {
-  /**
-   * Creates a test {@link CallStreamObserver}  {@link Builder} that forwards
-   * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}.
-   */
-  public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
-    return new Builder<>(new ForwardingCallStreamObserver<>(
-        onNext,
-        TestStreams::noop,
-        TestStreams::noop,
-        TestStreams::returnTrue));
-  }
-
-  /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
-  public static class Builder<T> {
-    private final ForwardingCallStreamObserver<T> observer;
-    private Builder(ForwardingCallStreamObserver<T> observer) {
-      this.observer = observer;
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link CallStreamObserver#isReady} callback.
-     */
-    public Builder<T> withIsReady(Supplier<Boolean> isReady) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          observer.onError,
-          observer.onCompleted,
-          isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onCompleted} callback.
-     */
-    public Builder<T> withOnCompleted(Runnable onCompleted) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          observer.onError,
-          onCompleted,
-          observer.isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onError} callback.
-     */
-    public Builder<T> withOnError(Runnable onError) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext,
-          new Consumer<Throwable>() {
-            @Override
-            public void accept(Throwable t) {
-              onError.run();
-            }
-          },
-          observer.onCompleted,
-          observer.isReady));
-    }
-
-    /**
-     * Returns a new {@link Builder} like this one with the specified
-     * {@link StreamObserver#onError} consumer.
-     */
-    public Builder<T> withOnError(Consumer<Throwable> onError) {
-      return new Builder<>(new ForwardingCallStreamObserver<>(
-          observer.onNext, onError, observer.onCompleted, observer.isReady));
-    }
-
-    public CallStreamObserver<T> build() {
-      return observer;
-    }
-  }
-
-  private static void noop() {
-  }
-
-  private static void noop(Throwable t) {
-  }
-
-  private static boolean returnTrue() {
-    return true;
-  }
-
-  /** A {@link CallStreamObserver} which executes the supplied callbacks. */
-  private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> {
-    private final Consumer<T> onNext;
-    private final Supplier<Boolean> isReady;
-    private final Consumer<Throwable> onError;
-    private final Runnable onCompleted;
-
-    public ForwardingCallStreamObserver(
-        Consumer<T> onNext,
-        Consumer<Throwable> onError,
-        Runnable onCompleted,
-        Supplier<Boolean> isReady) {
-      this.onNext = onNext;
-      this.onError = onError;
-      this.onCompleted = onCompleted;
-      this.isReady = isReady;
-    }
-
-    @Override
-    public void onNext(T value) {
-      onNext.accept(value);
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      onError.accept(t);
-    }
-
-    @Override
-    public void onCompleted() {
-      onCompleted.run();
-    }
-
-    @Override
-    public boolean isReady() {
-      return isReady.get();
-    }
-
-    @Override
-    public void setOnReadyHandler(Runnable onReadyHandler) {}
-
-    @Override
-    public void disableAutoInboundFlowControl() {}
-
-    @Override
-    public void request(int count) {}
-
-    @Override
-    public void setMessageCompression(boolean enable) {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
deleted file mode 100644
index b684c90..0000000
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.fn.harness.test;
-
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link TestStreams}. */
-@RunWith(JUnit4.class)
-public class TestStreamsTest {
-  @Test
-  public void testOnNextIsCalled() {
-    AtomicBoolean onNextWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true);
-    assertTrue(onNextWasCalled.get());
-  }
-
-  @Test
-  public void testIsReadyIsCalled() {
-    final AtomicBoolean isReadyWasCalled = new AtomicBoolean();
-    assertFalse(TestStreams.withOnNext(null)
-        .withIsReady(() -> isReadyWasCalled.getAndSet(true))
-        .build()
-        .isReady());
-    assertTrue(isReadyWasCalled.get());
-  }
-
-  @Test
-  public void testOnCompletedIsCalled() {
-    AtomicBoolean onCompletedWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(null)
-        .withOnCompleted(() -> onCompletedWasCalled.set(true))
-        .build()
-        .onCompleted();
-    assertTrue(onCompletedWasCalled.get());
-  }
-
-  @Test
-  public void testOnErrorRunnableIsCalled() {
-    RuntimeException throwable = new RuntimeException();
-    AtomicBoolean onErrorWasCalled = new AtomicBoolean();
-    TestStreams.withOnNext(null)
-        .withOnError(() -> onErrorWasCalled.set(true))
-        .build()
-        .onError(throwable);
-    assertTrue(onErrorWasCalled.get());
-  }
-
-  @Test
-  public void testOnErrorConsumerIsCalled() {
-    RuntimeException throwable = new RuntimeException();
-    Collection<Throwable> onErrorWasCalled = new ArrayList<>();
-    TestStreams.withOnNext(null)
-        .withOnError(onErrorWasCalled::add)
-        .build()
-        .onError(throwable);
-    assertThat(onErrorWasCalled, contains(throwable));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index e5af784..62e4ec3 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -53,6 +53,7 @@
         <jdk>[1.8,)</jdk>
       </activation>
       <modules>
+        <module>fn-execution</module>
         <module>harness</module>
         <module>container</module>
         <module>java8tests</module>