You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/09 00:43:55 UTC

[2/3] beam git commit: Fork Control Clients into java-fn-execution

Fork Control Clients into java-fn-execution

Deprecate versions in runners-core. Runner-side portability APIs should
not have a dependency edge to an SDK, and use of the java-fn-execution
package ensures that.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ed655be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ed655be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ed655be

Branch: refs/heads/master
Commit: 9ed655be780630e1218d185bd0d2ebfea099b988
Parents: 29399fd
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 6 15:03:17 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 8 16:43:42 2017 -0800

----------------------------------------------------------------------
 .../runners/core/fn/FnApiControlClient.java     |   4 +
 .../core/fn/FnApiControlClientPoolService.java  |   8 +-
 .../beam/runners/core/fn/SdkHarnessClient.java  |   4 +
 runners/java-fn-execution/pom.xml               |  15 ++
 .../fnexecution/control/FnApiControlClient.java | 148 ++++++++++++++++
 .../control/FnApiControlClientPoolService.java  |  66 +++++++
 .../fnexecution/control/SdkHarnessClient.java   | 173 +++++++++++++++++++
 .../fnexecution/control/package-info.java       |  23 +++
 .../FnApiControlClientPoolServiceTest.java      |  65 +++++++
 .../control/FnApiControlClientTest.java         | 139 +++++++++++++++
 .../control/SdkHarnessClientTest.java           |  96 ++++++++++
 11 files changed, 740 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
index 7546851..811444c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
@@ -39,7 +39,11 @@ import org.slf4j.LoggerFactory;
  * connections).
  *
  * <p>This low-level client is responsible only for correlating requests with responses.
+ *
+ * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
+ *     functionality.
  */
+@Deprecated
 class FnApiControlClient implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
index fd28040..21fc4f7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
@@ -24,7 +24,13 @@ import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** A Fn API control service which adds incoming SDK harness connections to a pool. */
+/**
+ * A Fn API control service which adds incoming SDK harness connections to a pool.
+ *
+ * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
+ *     functionality.
+ */
+@Deprecated
 public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase {
   private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
index bfd1837..091dea1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
@@ -31,7 +31,11 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi;
  *
  * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} and {@link
  * FnDataReceiver}, which handle lower-level gRPC message wrangling.
+ *
+ * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
+ *     functionality.
  */
+@Deprecated
 public class SdkHarnessClient {
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
index 1941f49..6ff08b7 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -97,6 +97,17 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- Test dependencies -->
     <dependency>
       <groupId>junit</groupId>
@@ -116,5 +127,9 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>org.mockito</groupId>
+          <artifactId>mockito-all</artifactId>
+      </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
new file mode 100644
index 0000000..8133988
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client for the control plane of an SDK harness, which can issue requests to it over the Fn API.
+ *
+ * <p>This class presents a low-level Java API de-inverting the Fn API's gRPC layer.
+ *
+ * <p>The Fn API is inverted so the runner is the server and the SDK harness is the client, for
+ * firewalling reasons (the runner may execute in a more privileged environment forbidding outbound
+ * connections).
+ *
+ * <p>This low-level client is responsible only for correlating requests with responses.
+ */
+class FnApiControlClient implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class);
+
+  // All writes to this StreamObserver need to be synchronized.
+  private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
+  private final ResponseStreamObserver responseObserver = new ResponseStreamObserver();
+  private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequests;
+  private volatile boolean isClosed;
+
+  private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> requestReceiver) {
+    this.requestReceiver = requestReceiver;
+    this.outstandingRequests = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns a {@link FnApiControlClient} which will submit its requests to the provided
+   * observer.
+   *
+   * <p>It is the responsibility of the caller to register this object as an observer of incoming
+   * responses (this will generally be done as part of fulfilling the contract of a gRPC service).
+   */
+  public static FnApiControlClient forRequestObserver(
+      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
+    return new FnApiControlClient(requestObserver);
+  }
+
+  public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle(
+      BeamFnApi.InstructionRequest request) {
+    LOG.debug("Sending InstructionRequest {}", request);
+    SettableFuture<BeamFnApi.InstructionResponse> resultFuture = SettableFuture.create();
+    outstandingRequests.put(request.getInstructionId(), resultFuture);
+    requestReceiver.onNext(request);
+    return resultFuture;
+  }
+
+  StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() {
+    return responseObserver;
+  }
+
+  @Override
+  public void close() {
+    closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection"));
+  }
+
+  /** Closes this client and terminates any outstanding requests exceptionally. */
+  private synchronized void closeAndTerminateOutstandingRequests(Throwable cause) {
+    if (isClosed) {
+      return;
+    }
+
+    // Make a copy of the map to make the view of the outstanding requests consistent.
+    Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy =
+        new ConcurrentHashMap<>(outstandingRequests);
+    outstandingRequests.clear();
+    isClosed = true;
+
+    if (outstandingRequestsCopy.isEmpty()) {
+      requestReceiver.onCompleted();
+      return;
+    }
+    requestReceiver.onError(
+        new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
+
+    LOG.error(
+        "{} closed, clearing outstanding requests {}",
+        FnApiControlClient.class.getSimpleName(),
+        outstandingRequestsCopy);
+    for (SettableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
+        outstandingRequestsCopy.values()) {
+      outstandingRequest.setException(cause);
+    }
+  }
+
+  /**
+   * A private view of this class as a {@link StreamObserver} for connecting as a gRPC listener.
+   */
+  private class ResponseStreamObserver implements StreamObserver<BeamFnApi.InstructionResponse> {
+    /**
+     * Processes an incoming {@link BeamFnApi.InstructionResponse} by correlating it with the
+     * corresponding {@link BeamFnApi.InstructionRequest} and completes the future that was returned
+     * by {@link #handle}.
+     */
+    @Override
+    public void onNext(BeamFnApi.InstructionResponse response) {
+      LOG.debug("Received InstructionResponse {}", response);
+      SettableFuture<BeamFnApi.InstructionResponse> completableFuture =
+          outstandingRequests.remove(response.getInstructionId());
+      if (completableFuture != null) {
+        completableFuture.set(response);
+      }
+    }
+
+    /** */
+    @Override
+    public void onCompleted() {
+      closeAndTerminateOutstandingRequests(
+          new IllegalStateException("SDK harness closed connection"));
+    }
+
+    @Override
+    public void onError(Throwable cause) {
+      LOG.error("{} received error {}", FnApiControlClient.class.getSimpleName(), cause);
+      closeAndTerminateOutstandingRequests(cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
new file mode 100644
index 0000000..37fae00
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.BlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A Fn API control service which adds incoming SDK harness connections to a pool. */
+public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class);
+
+  private final BlockingQueue<FnApiControlClient> clientPool;
+
+  private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  /**
+   * Creates a new {@link FnApiControlClientPoolService} which will enqueue and vend new SDK harness
+   * connections.
+   */
+  public static FnApiControlClientPoolService offeringClientsToPool(
+      BlockingQueue<FnApiControlClient> clientPool) {
+    return new FnApiControlClientPoolService(clientPool);
+  }
+
+  /**
+   * Called by gRPC for each incoming connection from an SDK harness, and enqueue an available SDK
+   * harness client.
+   *
+   * <p>Note: currently does not distinguish what sort of SDK it is, so a separate instance is
+   * required for each.
+   */
+  @Override
+  public StreamObserver<BeamFnApi.InstructionResponse> control(
+      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
+    LOGGER.info("Beam Fn Control client connected.");
+    FnApiControlClient newClient = FnApiControlClient.forRequestObserver(requestObserver);
+    try {
+      clientPool.put(newClient);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+    return newClient.asResponseObserver();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
new file mode 100644
index 0000000..5b47a58
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.fnexecution.data.FnDataReceiver;
+
+/**
+ * A high-level client for an SDK harness.
+ *
+ * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} and {@link
+ * FnDataReceiver}, which handle lower-level gRPC message wrangling.
+ */
+public class SdkHarnessClient {
+
+  /**
+   * A supply of unique identifiers, used internally. These must be unique across all Fn API
+   * clients.
+   */
+  public interface IdGenerator {
+    String getId();
+  }
+
+  /** A supply of unique identifiers that are simply incrementing longs. */
+  private static class CountingIdGenerator implements IdGenerator {
+    private final AtomicLong nextId = new AtomicLong(0L);
+
+    @Override
+    public String getId() {
+      return String.valueOf(nextId.incrementAndGet());
+    }
+  }
+
+  /**
+   * An active bundle for a particular {@link
+   * BeamFnApi.ProcessBundleDescriptor}.
+   */
+  @AutoValue
+  public abstract static class ActiveBundle<InputT> {
+    public abstract String getBundleId();
+
+    public abstract Future<BeamFnApi.ProcessBundleResponse> getBundleResponse();
+
+    public abstract FnDataReceiver<InputT> getInputReceiver();
+
+    public static <InputT> ActiveBundle<InputT> create(
+        String bundleId,
+        Future<BeamFnApi.ProcessBundleResponse> response,
+        FnDataReceiver<InputT> dataReceiver) {
+      return new AutoValue_SdkHarnessClient_ActiveBundle(bundleId, response, dataReceiver);
+    }
+  }
+
+  private final IdGenerator idGenerator;
+  private final FnApiControlClient fnApiControlClient;
+
+  private SdkHarnessClient(
+      FnApiControlClient fnApiControlClient,
+      IdGenerator idGenerator) {
+    this.idGenerator = idGenerator;
+    this.fnApiControlClient = fnApiControlClient;
+  }
+
+  /**
+   * Creates a client for a particular SDK harness. It is the responsibility of the caller to ensure
+   * that these correspond to the same SDK harness, so control plane and data plane messages can be
+   * correctly associated.
+   */
+  public static SdkHarnessClient usingFnApiClient(FnApiControlClient fnApiControlClient) {
+    return new SdkHarnessClient(fnApiControlClient, new CountingIdGenerator());
+  }
+
+  public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
+    return new SdkHarnessClient(fnApiControlClient, idGenerator);
+  }
+
+  /**
+   * Registers a {@link BeamFnApi.ProcessBundleDescriptor} for future
+   * processing.
+   *
+   * <p>A client may block on the result future, but may also proceed without blocking.
+   */
+  public Future<BeamFnApi.RegisterResponse> register(
+      Iterable<BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors) {
+
+    // TODO: validate that all the necessary data endpoints are known
+
+    ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
+        fnApiControlClient.handle(
+            BeamFnApi.InstructionRequest.newBuilder()
+                .setInstructionId(idGenerator.getId())
+                .setRegister(
+                    BeamFnApi.RegisterRequest.newBuilder()
+                        .addAllProcessBundleDescriptor(processBundleDescriptors)
+                        .build())
+                .build());
+
+    return Futures.transform(
+        genericResponse,
+        new Function<BeamFnApi.InstructionResponse, BeamFnApi.RegisterResponse>() {
+          @Override
+          public BeamFnApi.RegisterResponse apply(BeamFnApi.InstructionResponse input) {
+            return input.getRegister();
+          }
+        });
+  }
+
+  /**
+   * Start a new bundle for the given {@link
+   * BeamFnApi.ProcessBundleDescriptor} identifier.
+   *
+   * <p>The input channels for the returned {@link ActiveBundle} are derived from the
+   * instructions in the {@link BeamFnApi.ProcessBundleDescriptor}.
+   */
+  public ActiveBundle newBundle(String processBundleDescriptorId) {
+    String bundleId = idGenerator.getId();
+
+    // TODO: acquire an input receiver from appropriate FnDataService
+    FnDataReceiver dataReceiver = new FnDataReceiver() {
+      @Override
+      public void accept(Object input) throws Exception {
+        throw new UnsupportedOperationException("Placeholder FnDataReceiver cannot accept data.");
+      }
+
+      @Override
+      public void close() throws IOException {
+        // noop
+      }
+    };
+
+    ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
+        fnApiControlClient.handle(
+            BeamFnApi.InstructionRequest.newBuilder()
+                .setProcessBundle(
+                    BeamFnApi.ProcessBundleRequest.newBuilder()
+                        .setProcessBundleDescriptorReference(processBundleDescriptorId))
+                .build());
+
+    ListenableFuture<BeamFnApi.ProcessBundleResponse> specificResponse =
+        Futures.transform(
+            genericResponse,
+            new Function<BeamFnApi.InstructionResponse, BeamFnApi.ProcessBundleResponse>() {
+              @Override
+              public BeamFnApi.ProcessBundleResponse apply(BeamFnApi.InstructionResponse input) {
+                return input.getProcessBundle();
+              }
+            });
+
+    return ActiveBundle.create(bundleId, specificResponse, dataReceiver);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java
new file mode 100644
index 0000000..791faa2
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities for a Beam runner to interact with the Fn API {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc Control Service} via java abstractions.
+ */
+package org.apache.beam.runners.fnexecution.control;

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
new file mode 100644
index 0000000..9392ee0
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link FnApiControlClientPoolService}. */
+@RunWith(JUnit4.class)
+public class FnApiControlClientPoolServiceTest {
+
+  // For ease of straight-line testing, we use a LinkedBlockingQueue; in practice a SynchronousQueue
+  // for matching incoming connections and server threads is likely.
+  private final BlockingQueue<FnApiControlClient> pool = new LinkedBlockingQueue<>();
+  private FnApiControlClientPoolService controlService =
+      FnApiControlClientPoolService.offeringClientsToPool(pool);
+
+  @Test
+  public void testIncomingConnection() throws Exception {
+    StreamObserver<BeamFnApi.InstructionRequest> requestObserver = mock(StreamObserver.class);
+    StreamObserver<BeamFnApi.InstructionResponse> responseObserver =
+        controlService.control(requestObserver);
+
+    FnApiControlClient client = pool.take();
+
+    // Check that the client is wired up to the request channel
+    String id = "fakeInstruction";
+    ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
+        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+    verify(requestObserver).onNext(any(BeamFnApi.InstructionRequest.class));
+    assertThat(responseFuture.isDone(), is(false));
+
+    // Check that the response channel really came from the client
+    responseObserver.onNext(
+        BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
+    responseFuture.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
new file mode 100644
index 0000000..4732f5e
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link FnApiControlClient}. */
+@RunWith(JUnit4.class)
+public class FnApiControlClientTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock public StreamObserver<BeamFnApi.InstructionRequest> mockObserver;
+  private FnApiControlClient client;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    client = FnApiControlClient.forRequestObserver(mockObserver);
+  }
+
+  @Test
+  public void testRequestSent() {
+    String id = "instructionId";
+    client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    verify(mockObserver).onNext(any(BeamFnApi.InstructionRequest.class));
+  }
+
+  @Test
+  public void testRequestSuccess() throws Exception {
+    String id = "successfulInstruction";
+
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+    client
+        .asResponseObserver()
+        .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
+
+    BeamFnApi.InstructionResponse response = responseFuture.get();
+
+    assertThat(response.getInstructionId(), equalTo(id));
+  }
+
+  @Test
+  public void testUnknownResponseIgnored() throws Exception {
+    String id = "actualInstruction";
+    String unknownId = "unknownInstruction";
+
+    ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
+        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    client
+        .asResponseObserver()
+        .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(unknownId).build());
+
+    assertThat(responseFuture.isDone(), is(false));
+    assertThat(responseFuture.isCancelled(), is(false));
+  }
+
+  @Test
+  public void testOnCompletedCancelsOutstanding() throws Exception {
+    String id = "clientHangUpInstruction";
+
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    client.asResponseObserver().onCompleted();
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    thrown.expectMessage("closed");
+    responseFuture.get();
+  }
+
+  @Test
+  public void testOnErrorCancelsOutstanding() throws Exception {
+    String id = "errorInstruction";
+
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    class FrazzleException extends Exception {}
+    client.asResponseObserver().onError(new FrazzleException());
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(FrazzleException.class));
+    responseFuture.get();
+  }
+
+  @Test
+  public void testCloseCancelsOutstanding() throws Exception {
+    String id = "serverCloseInstruction";
+
+    Future<BeamFnApi.InstructionResponse> responseFuture =
+        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+    client.close();
+
+    thrown.expect(ExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    thrown.expectMessage("closed");
+    responseFuture.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
new file mode 100644
index 0000000..09437c7
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.Future;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link SdkHarnessClient}. */
+@RunWith(JUnit4.class)
+public class SdkHarnessClientTest {
+
+  @Mock public FnApiControlClient fnApiControlClient;
+
+  private SdkHarnessClient sdkHarnessClient;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    sdkHarnessClient = SdkHarnessClient.usingFnApiClient(fnApiControlClient);
+  }
+
+  @Test
+  public void testRegisterDoesNotCrash() throws Exception {
+    String descriptorId1 = "descriptor1";
+    String descriptorId2 = "descriptor2";
+
+    SettableFuture<BeamFnApi.InstructionResponse> registerResponseFuture = SettableFuture.create();
+    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+        .thenReturn(registerResponseFuture);
+
+    Future<BeamFnApi.RegisterResponse> responseFuture = sdkHarnessClient.register(
+        ImmutableList.of(
+            BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(),
+            BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId2).build()));
+
+    // Correlating the RegisterRequest and RegisterResponse is owned by the underlying
+    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping
+    // the response.
+    //
+    // Currently there are no fields so there's nothing to check. This test is formulated
+    // to match the pattern it should have if/when the response is meaningful.
+    BeamFnApi.RegisterResponse response = BeamFnApi.RegisterResponse.getDefaultInstance();
+    registerResponseFuture.set(
+        BeamFnApi.InstructionResponse.newBuilder().setRegister(response).build());
+    responseFuture.get();
+  }
+
+  @Test
+  public void testNewBundleNoDataDoesNotCrash() throws Exception {
+    String descriptorId1 = "descriptor1";
+
+    SettableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture =
+        SettableFuture.create();
+    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+        .thenReturn(processBundleResponseFuture);
+
+    SdkHarnessClient.ActiveBundle activeBundle = sdkHarnessClient.newBundle(descriptorId1);
+
+    // Correlating the ProcessBundleRequest and ProcessBundleReponse is owned by the underlying
+    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping
+    // the response.
+    //
+    // Currently there are no fields so there's nothing to check. This test is formulated
+    // to match the pattern it should have if/when the response is meaningful.
+    BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
+    processBundleResponseFuture.set(
+        BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
+    activeBundle.getBundleResponse().get();
+  }
+}