You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/11/09 00:43:55 UTC
[2/3] beam git commit: Fork Control Clients into java-fn-execution
Fork Control Clients into java-fn-execution
Deprecate versions in runners-core. Runner-side portability APIs should
not have a dependency edge to an SDK, and use of the java-fn-execution
package ensures that.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ed655be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ed655be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ed655be
Branch: refs/heads/master
Commit: 9ed655be780630e1218d185bd0d2ebfea099b988
Parents: 29399fd
Author: Thomas Groh <tg...@google.com>
Authored: Mon Nov 6 15:03:17 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 8 16:43:42 2017 -0800
----------------------------------------------------------------------
.../runners/core/fn/FnApiControlClient.java | 4 +
.../core/fn/FnApiControlClientPoolService.java | 8 +-
.../beam/runners/core/fn/SdkHarnessClient.java | 4 +
runners/java-fn-execution/pom.xml | 15 ++
.../fnexecution/control/FnApiControlClient.java | 148 ++++++++++++++++
.../control/FnApiControlClientPoolService.java | 66 +++++++
.../fnexecution/control/SdkHarnessClient.java | 173 +++++++++++++++++++
.../fnexecution/control/package-info.java | 23 +++
.../FnApiControlClientPoolServiceTest.java | 65 +++++++
.../control/FnApiControlClientTest.java | 139 +++++++++++++++
.../control/SdkHarnessClientTest.java | 96 ++++++++++
11 files changed, 740 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
index 7546851..811444c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
@@ -39,7 +39,11 @@ import org.slf4j.LoggerFactory;
* connections).
*
* <p>This low-level client is responsible only for correlating requests with responses.
+ *
+ * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
+ * functionality.
*/
+@Deprecated
class FnApiControlClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
index fd28040..21fc4f7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
@@ -24,7 +24,13 @@ import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** A Fn API control service which adds incoming SDK harness connections to a pool. */
+/**
+ * A Fn API control service which adds incoming SDK harness connections to a pool.
+ *
+ * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
+ * functionality.
+ */
+@Deprecated
public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
index bfd1837..091dea1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
@@ -31,7 +31,11 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi;
*
* <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} and {@link
* FnDataReceiver}, which handle lower-level gRPC message wrangling.
+ *
+ * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
+ * functionality.
*/
+@Deprecated
public class SdkHarnessClient {
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/pom.xml
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
index 1941f49..6ff08b7 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -97,6 +97,17 @@
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
@@ -116,5 +127,9 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
new file mode 100644
index 0000000..8133988
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client for the control plane of an SDK harness, which can issue requests to it over the Fn API.
+ *
+ * <p>This class presents a low-level Java API de-inverting the Fn API's gRPC layer.
+ *
+ * <p>The Fn API is inverted so the runner is the server and the SDK harness is the client, for
+ * firewalling reasons (the runner may execute in a more privileged environment forbidding outbound
+ * connections).
+ *
+ * <p>This low-level client is responsible only for correlating requests with responses.
+ */
+class FnApiControlClient implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class);
+
+ // All writes to this StreamObserver need to be synchronized.
+ private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
+ private final ResponseStreamObserver responseObserver = new ResponseStreamObserver();
+ private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequests;
+ private volatile boolean isClosed;
+
+ private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> requestReceiver) {
+ this.requestReceiver = requestReceiver;
+ this.outstandingRequests = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Returns a {@link FnApiControlClient} which will submit its requests to the provided
+ * observer.
+ *
+ * <p>It is the responsibility of the caller to register this object as an observer of incoming
+ * responses (this will generally be done as part of fulfilling the contract of a gRPC service).
+ */
+ public static FnApiControlClient forRequestObserver(
+ StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
+ return new FnApiControlClient(requestObserver);
+ }
+
+ public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle(
+ BeamFnApi.InstructionRequest request) {
+ LOG.debug("Sending InstructionRequest {}", request);
+ SettableFuture<BeamFnApi.InstructionResponse> resultFuture = SettableFuture.create();
+ outstandingRequests.put(request.getInstructionId(), resultFuture);
+ requestReceiver.onNext(request);
+ return resultFuture;
+ }
+
+ StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() {
+ return responseObserver;
+ }
+
+ @Override
+ public void close() {
+ closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection"));
+ }
+
+ /** Closes this client and terminates any outstanding requests exceptionally. */
+ private synchronized void closeAndTerminateOutstandingRequests(Throwable cause) {
+ if (isClosed) {
+ return;
+ }
+
+ // Make a copy of the map to make the view of the outstanding requests consistent.
+ Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy =
+ new ConcurrentHashMap<>(outstandingRequests);
+ outstandingRequests.clear();
+ isClosed = true;
+
+ if (outstandingRequestsCopy.isEmpty()) {
+ requestReceiver.onCompleted();
+ return;
+ }
+ requestReceiver.onError(
+ new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
+
+ LOG.error(
+ "{} closed, clearing outstanding requests {}",
+ FnApiControlClient.class.getSimpleName(),
+ outstandingRequestsCopy);
+ for (SettableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
+ outstandingRequestsCopy.values()) {
+ outstandingRequest.setException(cause);
+ }
+ }
+
+ /**
+ * A private view of this class as a {@link StreamObserver} for connecting as a gRPC listener.
+ */
+ private class ResponseStreamObserver implements StreamObserver<BeamFnApi.InstructionResponse> {
+ /**
+ * Processes an incoming {@link BeamFnApi.InstructionResponse} by correlating it with the
+ * corresponding {@link BeamFnApi.InstructionRequest} and completes the future that was returned
+ * by {@link #handle}.
+ */
+ @Override
+ public void onNext(BeamFnApi.InstructionResponse response) {
+ LOG.debug("Received InstructionResponse {}", response);
+ SettableFuture<BeamFnApi.InstructionResponse> completableFuture =
+ outstandingRequests.remove(response.getInstructionId());
+ if (completableFuture != null) {
+ completableFuture.set(response);
+ }
+ }
+
+ /** */
+ @Override
+ public void onCompleted() {
+ closeAndTerminateOutstandingRequests(
+ new IllegalStateException("SDK harness closed connection"));
+ }
+
+ @Override
+ public void onError(Throwable cause) {
+ LOG.error("{} received error {}", FnApiControlClient.class.getSimpleName(), cause);
+ closeAndTerminateOutstandingRequests(cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
new file mode 100644
index 0000000..37fae00
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.BlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A Fn API control service which adds incoming SDK harness connections to a pool. */
+public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class);
+
+ private final BlockingQueue<FnApiControlClient> clientPool;
+
+ private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> clientPool) {
+ this.clientPool = clientPool;
+ }
+
+ /**
+ * Creates a new {@link FnApiControlClientPoolService} which will enqueue and vend new SDK harness
+ * connections.
+ */
+ public static FnApiControlClientPoolService offeringClientsToPool(
+ BlockingQueue<FnApiControlClient> clientPool) {
+ return new FnApiControlClientPoolService(clientPool);
+ }
+
+ /**
+ * Called by gRPC for each incoming connection from an SDK harness, and enqueue an available SDK
+ * harness client.
+ *
+ * <p>Note: currently does not distinguish what sort of SDK it is, so a separate instance is
+ * required for each.
+ */
+ @Override
+ public StreamObserver<BeamFnApi.InstructionResponse> control(
+ StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
+ LOGGER.info("Beam Fn Control client connected.");
+ FnApiControlClient newClient = FnApiControlClient.forRequestObserver(requestObserver);
+ try {
+ clientPool.put(newClient);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ return newClient.asResponseObserver();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
new file mode 100644
index 0000000..5b47a58
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.fnexecution.data.FnDataReceiver;
+
+/**
+ * A high-level client for an SDK harness.
+ *
+ * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} and {@link
+ * FnDataReceiver}, which handle lower-level gRPC message wrangling.
+ */
+public class SdkHarnessClient {
+
+ /**
+ * A supply of unique identifiers, used internally. These must be unique across all Fn API
+ * clients.
+ */
+ public interface IdGenerator {
+ String getId();
+ }
+
+ /** A supply of unique identifiers that are simply incrementing longs. */
+ private static class CountingIdGenerator implements IdGenerator {
+ private final AtomicLong nextId = new AtomicLong(0L);
+
+ @Override
+ public String getId() {
+ return String.valueOf(nextId.incrementAndGet());
+ }
+ }
+
+ /**
+ * An active bundle for a particular {@link
+ * BeamFnApi.ProcessBundleDescriptor}.
+ */
+ @AutoValue
+ public abstract static class ActiveBundle<InputT> {
+ public abstract String getBundleId();
+
+ public abstract Future<BeamFnApi.ProcessBundleResponse> getBundleResponse();
+
+ public abstract FnDataReceiver<InputT> getInputReceiver();
+
+ public static <InputT> ActiveBundle<InputT> create(
+ String bundleId,
+ Future<BeamFnApi.ProcessBundleResponse> response,
+ FnDataReceiver<InputT> dataReceiver) {
+ return new AutoValue_SdkHarnessClient_ActiveBundle(bundleId, response, dataReceiver);
+ }
+ }
+
+ private final IdGenerator idGenerator;
+ private final FnApiControlClient fnApiControlClient;
+
+ private SdkHarnessClient(
+ FnApiControlClient fnApiControlClient,
+ IdGenerator idGenerator) {
+ this.idGenerator = idGenerator;
+ this.fnApiControlClient = fnApiControlClient;
+ }
+
+ /**
+ * Creates a client for a particular SDK harness. It is the responsibility of the caller to ensure
+ * that these correspond to the same SDK harness, so control plane and data plane messages can be
+ * correctly associated.
+ */
+ public static SdkHarnessClient usingFnApiClient(FnApiControlClient fnApiControlClient) {
+ return new SdkHarnessClient(fnApiControlClient, new CountingIdGenerator());
+ }
+
+ public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
+ return new SdkHarnessClient(fnApiControlClient, idGenerator);
+ }
+
+ /**
+ * Registers a {@link BeamFnApi.ProcessBundleDescriptor} for future
+ * processing.
+ *
+ * <p>A client may block on the result future, but may also proceed without blocking.
+ */
+ public Future<BeamFnApi.RegisterResponse> register(
+ Iterable<BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors) {
+
+ // TODO: validate that all the necessary data endpoints are known
+
+ ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
+ fnApiControlClient.handle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setInstructionId(idGenerator.getId())
+ .setRegister(
+ BeamFnApi.RegisterRequest.newBuilder()
+ .addAllProcessBundleDescriptor(processBundleDescriptors)
+ .build())
+ .build());
+
+ return Futures.transform(
+ genericResponse,
+ new Function<BeamFnApi.InstructionResponse, BeamFnApi.RegisterResponse>() {
+ @Override
+ public BeamFnApi.RegisterResponse apply(BeamFnApi.InstructionResponse input) {
+ return input.getRegister();
+ }
+ });
+ }
+
+ /**
+ * Start a new bundle for the given {@link
+ * BeamFnApi.ProcessBundleDescriptor} identifier.
+ *
+ * <p>The input channels for the returned {@link ActiveBundle} are derived from the
+ * instructions in the {@link BeamFnApi.ProcessBundleDescriptor}.
+ */
+ public ActiveBundle newBundle(String processBundleDescriptorId) {
+ String bundleId = idGenerator.getId();
+
+ // TODO: acquire an input receiver from appropriate FnDataService
+ FnDataReceiver dataReceiver = new FnDataReceiver() {
+ @Override
+ public void accept(Object input) throws Exception {
+ throw new UnsupportedOperationException("Placeholder FnDataReceiver cannot accept data.");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+ };
+
+ ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
+ fnApiControlClient.handle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder()
+ .setProcessBundleDescriptorReference(processBundleDescriptorId))
+ .build());
+
+ ListenableFuture<BeamFnApi.ProcessBundleResponse> specificResponse =
+ Futures.transform(
+ genericResponse,
+ new Function<BeamFnApi.InstructionResponse, BeamFnApi.ProcessBundleResponse>() {
+ @Override
+ public BeamFnApi.ProcessBundleResponse apply(BeamFnApi.InstructionResponse input) {
+ return input.getProcessBundle();
+ }
+ });
+
+ return ActiveBundle.create(bundleId, specificResponse, dataReceiver);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java
new file mode 100644
index 0000000..791faa2
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities for a Beam runner to interact with the Fn API {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc Control Service} via java abstractions.
+ */
+package org.apache.beam.runners.fnexecution.control;
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
new file mode 100644
index 0000000..9392ee0
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link FnApiControlClientPoolService}. */
+@RunWith(JUnit4.class)
+public class FnApiControlClientPoolServiceTest {
+
+ // For ease of straight-line testing, we use a LinkedBlockingQueue; in practice a SynchronousQueue
+ // for matching incoming connections and server threads is likely.
+ private final BlockingQueue<FnApiControlClient> pool = new LinkedBlockingQueue<>();
+ private FnApiControlClientPoolService controlService =
+ FnApiControlClientPoolService.offeringClientsToPool(pool);
+
+ @Test
+ public void testIncomingConnection() throws Exception {
+ StreamObserver<BeamFnApi.InstructionRequest> requestObserver = mock(StreamObserver.class);
+ StreamObserver<BeamFnApi.InstructionResponse> responseObserver =
+ controlService.control(requestObserver);
+
+ FnApiControlClient client = pool.take();
+
+ // Check that the client is wired up to the request channel
+ String id = "fakeInstruction";
+ ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
+ client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+ verify(requestObserver).onNext(any(BeamFnApi.InstructionRequest.class));
+ assertThat(responseFuture.isDone(), is(false));
+
+ // Check that the response channel really came from the client
+ responseObserver.onNext(
+ BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
+ responseFuture.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
new file mode 100644
index 0000000..4732f5e
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link FnApiControlClient}. */
+@RunWith(JUnit4.class)
+public class FnApiControlClientTest {
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Mock public StreamObserver<BeamFnApi.InstructionRequest> mockObserver;
+ private FnApiControlClient client;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ client = FnApiControlClient.forRequestObserver(mockObserver);
+ }
+
+ @Test
+ public void testRequestSent() {
+ String id = "instructionId";
+ client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+ verify(mockObserver).onNext(any(BeamFnApi.InstructionRequest.class));
+ }
+
+ @Test
+ public void testRequestSuccess() throws Exception {
+ String id = "successfulInstruction";
+
+ Future<BeamFnApi.InstructionResponse> responseFuture =
+ client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+ client
+ .asResponseObserver()
+ .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
+
+ BeamFnApi.InstructionResponse response = responseFuture.get();
+
+ assertThat(response.getInstructionId(), equalTo(id));
+ }
+
+ @Test
+ public void testUnknownResponseIgnored() throws Exception {
+ String id = "actualInstruction";
+ String unknownId = "unknownInstruction";
+
+ ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
+ client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+ client
+ .asResponseObserver()
+ .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(unknownId).build());
+
+ assertThat(responseFuture.isDone(), is(false));
+ assertThat(responseFuture.isCancelled(), is(false));
+ }
+
+ @Test
+ public void testOnCompletedCancelsOutstanding() throws Exception {
+ String id = "clientHangUpInstruction";
+
+ Future<BeamFnApi.InstructionResponse> responseFuture =
+ client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+ client.asResponseObserver().onCompleted();
+
+ thrown.expect(ExecutionException.class);
+ thrown.expectCause(isA(IllegalStateException.class));
+ thrown.expectMessage("closed");
+ responseFuture.get();
+ }
+
+ @Test
+ public void testOnErrorCancelsOutstanding() throws Exception {
+ String id = "errorInstruction";
+
+ Future<BeamFnApi.InstructionResponse> responseFuture =
+ client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+ class FrazzleException extends Exception {}
+ client.asResponseObserver().onError(new FrazzleException());
+
+ thrown.expect(ExecutionException.class);
+ thrown.expectCause(isA(FrazzleException.class));
+ responseFuture.get();
+ }
+
+ @Test
+ public void testCloseCancelsOutstanding() throws Exception {
+ String id = "serverCloseInstruction";
+
+ Future<BeamFnApi.InstructionResponse> responseFuture =
+ client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
+
+ client.close();
+
+ thrown.expect(ExecutionException.class);
+ thrown.expectCause(isA(IllegalStateException.class));
+ thrown.expectMessage("closed");
+ responseFuture.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9ed655be/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
new file mode 100644
index 0000000..09437c7
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.Future;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link SdkHarnessClient}. */
+@RunWith(JUnit4.class)
+public class SdkHarnessClientTest {
+
+ @Mock public FnApiControlClient fnApiControlClient;
+
+ private SdkHarnessClient sdkHarnessClient;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ sdkHarnessClient = SdkHarnessClient.usingFnApiClient(fnApiControlClient);
+ }
+
+ @Test
+ public void testRegisterDoesNotCrash() throws Exception {
+ String descriptorId1 = "descriptor1";
+ String descriptorId2 = "descriptor2";
+
+ SettableFuture<BeamFnApi.InstructionResponse> registerResponseFuture = SettableFuture.create();
+ when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+ .thenReturn(registerResponseFuture);
+
+ Future<BeamFnApi.RegisterResponse> responseFuture = sdkHarnessClient.register(
+ ImmutableList.of(
+ BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(),
+ BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId2).build()));
+
+ // Correlating the RegisterRequest and RegisterResponse is owned by the underlying
+ // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping
+ // the response.
+ //
+ // Currently there are no fields so there's nothing to check. This test is formulated
+ // to match the pattern it should have if/when the response is meaningful.
+ BeamFnApi.RegisterResponse response = BeamFnApi.RegisterResponse.getDefaultInstance();
+ registerResponseFuture.set(
+ BeamFnApi.InstructionResponse.newBuilder().setRegister(response).build());
+ responseFuture.get();
+ }
+
+ @Test
+ public void testNewBundleNoDataDoesNotCrash() throws Exception {
+ String descriptorId1 = "descriptor1";
+
+ SettableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture =
+ SettableFuture.create();
+ when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+ .thenReturn(processBundleResponseFuture);
+
+ SdkHarnessClient.ActiveBundle activeBundle = sdkHarnessClient.newBundle(descriptorId1);
+
+ // Correlating the ProcessBundleRequest and ProcessBundleReponse is owned by the underlying
+ // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping
+ // the response.
+ //
+ // Currently there are no fields so there's nothing to check. This test is formulated
+ // to match the pattern it should have if/when the response is meaningful.
+ BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
+ processBundleResponseFuture.set(
+ BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
+ activeBundle.getBundleResponse().get();
+ }
+}