You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/31 18:18:49 UTC

[1/2] beam git commit: [BEAM-1347] Wire up the BeamFnStateGrpcClientCache implementation into the ProcessBundleHandler

Repository: beam
Updated Branches:
  refs/heads/master 5cb7be78b -> c9653f270


[BEAM-1347] Wire up the BeamFnStateGrpcClientCache implementation into the ProcessBundleHandler

Add a BeamFnStateClient that is dependent on whether the State API service descriptor is populated.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7f8c6e85
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7f8c6e85
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7f8c6e85

Branch: refs/heads/master
Commit: 7f8c6e8541d37a4f4ee79bbc14e3f43a38d261c6
Parents: 5cb7be7
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 30 13:56:40 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 31 09:10:13 2017 -0700

----------------------------------------------------------------------
 .../fn-api/src/main/proto/beam_fn_api.proto     |   5 +
 .../org/apache/beam/fn/harness/FnHarness.java   |   9 +-
 .../harness/control/ProcessBundleHandler.java   | 147 ++++++++++++++----
 .../control/ProcessBundleHandlerTest.java       | 153 ++++++++++++++++++-
 4 files changed, 279 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7f8c6e85/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 9da5afe..53d67bc 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -168,6 +168,11 @@ message ProcessBundleDescriptor {
 
   // (Required) A map from pipeline-scoped id to Environment.
   map<string, org.apache.beam.runner_api.v1.Environment> environments = 6;
+
+  // A descriptor describing the end point to use for State API
+  // calls. Required if the Runner intends to send remote references over the
+  // data plane or if any of the transforms rely on user state or side inputs.
+  ApiServiceDescriptor state_api_service_descriptor = 7;
 }
 
 // A request to process a given bundle.

http://git-wip-us.apache.org/repos/asf/beam/blob/7f8c6e85/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index a79ecca..49a7a88 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -29,6 +29,7 @@ import org.apache.beam.fn.harness.control.RegisterHandler;
 import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
 import org.apache.beam.fn.harness.fn.ThrowingFunction;
 import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
+import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.harness.stream.StreamObserverFactory;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
@@ -109,11 +110,17 @@ public class FnHarness {
       BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(
           options, channelFactory::forDescriptor, streamObserverFactory::from);
 
+      BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = new BeamFnStateGrpcClientCache(
+          options,
+          IdGenerator::generate,
+          channelFactory::forDescriptor,
+          streamObserverFactory::from);
+
       ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(
           options,
           fnApiRegistry::getById,
           beamFnDataMultiplexer,
-          null /* beamFnStateClient */);
+          beamFnStateGrpcClientCache);
       handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
           fnApiRegistry::register);
       handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,

http://git-wip-us.apache.org/repos/asf/beam/blob/7f8c6e85/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 67c4d67..e094487 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Phaser;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -40,7 +42,13 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
+import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -84,7 +92,7 @@ public class ProcessBundleHandler {
   private final PipelineOptions options;
   private final Function<String, Message> fnApiRegistry;
   private final BeamFnDataClient beamFnDataClient;
-  private final BeamFnStateClient beamFnStateClient;
+  private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
   private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
   private final PTransformRunnerFactory defaultPTransformRunnerFactory;
 
@@ -93,8 +101,12 @@ public class ProcessBundleHandler {
       PipelineOptions options,
       Function<String, Message> fnApiRegistry,
       BeamFnDataClient beamFnDataClient,
-      BeamFnStateClient beamFnStateClient) {
-    this(options, fnApiRegistry, beamFnDataClient, beamFnStateClient, REGISTERED_RUNNER_FACTORIES);
+      BeamFnStateGrpcClientCache beamFnStateGrpcClientCache) {
+    this(options,
+        fnApiRegistry,
+        beamFnDataClient,
+        beamFnStateGrpcClientCache,
+        REGISTERED_RUNNER_FACTORIES);
   }
 
   @VisibleForTesting
@@ -102,12 +114,12 @@ public class ProcessBundleHandler {
       PipelineOptions options,
       Function<String, Message> fnApiRegistry,
       BeamFnDataClient beamFnDataClient,
-      BeamFnStateClient beamFnStateClient,
+      BeamFnStateGrpcClientCache beamFnStateGrpcClientCache,
       Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) {
     this.options = options;
     this.fnApiRegistry = fnApiRegistry;
     this.beamFnDataClient = beamFnDataClient;
-    this.beamFnStateClient = beamFnStateClient;
+    this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache;
     this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
     this.defaultPTransformRunnerFactory = new PTransformRunnerFactory<Object>() {
       @Override
@@ -132,6 +144,7 @@ public class ProcessBundleHandler {
   }
 
   private void createRunnerAndConsumersForPTransformRecursively(
+      BeamFnStateClient beamFnStateClient,
       String pTransformId,
       RunnerApi.PTransform pTransform,
       Supplier<String> processBundleInstructionId,
@@ -152,6 +165,7 @@ public class ProcessBundleHandler {
 
       for (String consumingPTransformId : pCollectionIdsToConsumingPTransforms.get(pCollectionId)) {
         createRunnerAndConsumersForPTransformRecursively(
+            beamFnStateClient,
             consumingPTransformId,
             processBundleDescriptor.getTransformsMap().get(consumingPTransformId),
             processBundleInstructionId,
@@ -204,39 +218,110 @@ public class ProcessBundleHandler {
       }
     }
 
-    //
-    for (Map.Entry<String, RunnerApi.PTransform> entry
-        : bundleDescriptor.getTransformsMap().entrySet()) {
-      // Skip anything which isn't a root
-      // TODO: Remove source as a root and have it be triggered by the Runner.
-      if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
-          && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn())) {
-        continue;
+    // Instantiate a State API call handler depending on whether a State Api service descriptor
+    // was specified.
+    try (HandleStateCallsForBundle beamFnStateClient =
+        bundleDescriptor.hasStateApiServiceDescriptor()
+        ? new BlockTillStateCallsFinish(beamFnStateGrpcClientCache.forApiServiceDescriptor(
+            bundleDescriptor.getStateApiServiceDescriptor()))
+        : new FailAllStateCallsForBundle(request.getProcessBundle())) {
+      // Create a BeamFnStateClient
+      for (Map.Entry<String, RunnerApi.PTransform> entry
+          : bundleDescriptor.getTransformsMap().entrySet()) {
+        // Skip anything which isn't a root
+        // TODO: Remove source as a root and have it be triggered by the Runner.
+        if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
+            && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn())) {
+          continue;
+        }
+
+        createRunnerAndConsumersForPTransformRecursively(
+            beamFnStateClient,
+            entry.getKey(),
+            entry.getValue(),
+            request::getInstructionId,
+            bundleDescriptor,
+            pCollectionIdsToConsumingPTransforms,
+            pCollectionIdsToConsumers,
+            startFunctions::add,
+            finishFunctions::add);
+      }
+
+      // Already in reverse topological order so we don't need to do anything.
+      for (ThrowingRunnable startFunction : startFunctions) {
+        LOG.debug("Starting function {}", startFunction);
+        startFunction.run();
       }
 
-      createRunnerAndConsumersForPTransformRecursively(
-          entry.getKey(),
-          entry.getValue(),
-          request::getInstructionId,
-          bundleDescriptor,
-          pCollectionIdsToConsumingPTransforms,
-          pCollectionIdsToConsumers,
-          startFunctions::add,
-          finishFunctions::add);
+      // Need to reverse this since we want to call finish in topological order.
+      for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctions)) {
+        LOG.debug("Finishing function {}", finishFunction);
+        finishFunction.run();
+      }
     }
 
-    // Already in reverse topological order so we don't need to do anything.
-    for (ThrowingRunnable startFunction : startFunctions) {
-      LOG.debug("Starting function {}", startFunction);
-      startFunction.run();
+    return response;
+  }
+
+  /**
+   * A {@link BeamFnStateClient} which counts the number of outstanding {@link StateRequest}s and
+   * blocks till they are all finished.
+   */
+  private class BlockTillStateCallsFinish extends HandleStateCallsForBundle {
+    private final BeamFnStateClient beamFnStateClient;
+    private final Phaser phaser;
+    private int currentPhase;
+
+    private BlockTillStateCallsFinish(BeamFnStateClient beamFnStateClient) {
+      this.beamFnStateClient = beamFnStateClient;
+      this.phaser  = new Phaser(1 /* initial party is the process bundle handler */);
+      this.currentPhase = phaser.getPhase();
     }
 
-    // Need to reverse this since we want to call finish in topological order.
-    for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctions)) {
-      LOG.debug("Finishing function {}", finishFunction);
-      finishFunction.run();
+    @Override
+    public void close() throws Exception {
+      int unarrivedParties = phaser.getUnarrivedParties();
+      if (unarrivedParties > 0) {
+        LOG.debug("Waiting for {} parties to arrive before closing, current phase {}.",
+            unarrivedParties, currentPhase);
+      }
+      currentPhase = phaser.arriveAndAwaitAdvance();
     }
 
-    return response;
+    @Override
+    public void handle(StateRequest.Builder requestBuilder,
+        CompletableFuture<StateResponse> response) {
+      // Register each request with the phaser and arrive and deregister each time a request
+      // completes.
+      phaser.register();
+      response.whenComplete((stateResponse, throwable) -> phaser.arriveAndDeregister());
+      beamFnStateClient.handle(requestBuilder, response);
+    }
+  }
+
+  /**
+   * A {@link BeamFnStateClient} which fails all requests because the {@link ProcessBundleRequest}
+   * does not contain a State API {@link ApiServiceDescriptor}.
+   */
+  private class FailAllStateCallsForBundle extends HandleStateCallsForBundle {
+    private final ProcessBundleRequest request;
+
+    private FailAllStateCallsForBundle(ProcessBundleRequest request) {
+      this.request = request;
+    }
+
+    @Override
+    public void close() throws Exception {
+      // no-op
+    }
+
+    @Override
+    public void handle(Builder requestBuilder, CompletableFuture<StateResponse> response) {
+      throw new IllegalStateException(String.format("State API calls are unsupported because the "
+          + "ProcessBundleRequest %s does not support state.", request));
+    }
+  }
+
+  private abstract class HandleStateCallsForBundle implements AutoCloseable, BeamFnStateClient {
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7f8c6e85/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index d0e1faf..94fa6ad 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -21,14 +21,21 @@ package org.apache.beam.fn.harness.control;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.Message;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.PTransformRunnerFactory;
@@ -36,7 +43,11 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
+import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -50,7 +61,10 @@ import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /** Tests for {@link ProcessBundleHandler}. */
 @RunWith(JUnit4.class)
@@ -150,7 +164,7 @@ public class ProcessBundleHandlerTest {
         PipelineOptionsFactory.create(),
         fnApiRegistry::get,
         beamFnDataClient,
-        null /* beamFnStateClient */,
+        null /* beamFnStateGrpcClientCache */,
         ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
           @Override
           public Object createRunnerForPTransform(
@@ -190,7 +204,7 @@ public class ProcessBundleHandlerTest {
         PipelineOptionsFactory.create(),
         fnApiRegistry::get,
         beamFnDataClient,
-        null /* beamFnStateClient */,
+        null /* beamFnStateGrpcClientCache */,
         ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
           @Override
           public Object createRunnerForPTransform(
@@ -231,7 +245,7 @@ public class ProcessBundleHandlerTest {
         PipelineOptionsFactory.create(),
         fnApiRegistry::get,
         beamFnDataClient,
-        null /* beamFnStateClient */,
+        null /* beamFnStateGrpcClientCache */,
         ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
           @Override
           public Object createRunnerForPTransform(
@@ -258,6 +272,139 @@ public class ProcessBundleHandlerTest {
             .build());
   }
 
+  @Test
+  public void testPendingStateCallsBlockTillCompletion() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms("2L", RunnerApi.PTransform.newBuilder()
+                .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                .build())
+            .setStateApiServiceDescriptor(ApiServiceDescriptor.getDefaultInstance())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    CompletableFuture<StateResponse> successfulResponse = new CompletableFuture<>();
+    CompletableFuture<StateResponse> unsuccessfulResponse = new CompletableFuture<>();
+
+    BeamFnStateGrpcClientCache mockBeamFnStateGrpcClient =
+        Mockito.mock(BeamFnStateGrpcClientCache.class);
+    BeamFnStateClient mockBeamFnStateClient = Mockito.mock(BeamFnStateClient.class);
+    when(mockBeamFnStateGrpcClient.forApiServiceDescriptor(any()))
+        .thenReturn(mockBeamFnStateClient);
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        StateRequest.Builder stateRequestBuilder =
+            (StateRequest.Builder) invocation.getArguments()[0];
+        CompletableFuture<StateResponse> completableFuture =
+            (CompletableFuture<StateResponse>) invocation.getArguments()[1];
+        new Thread() {
+          @Override
+          public void run() {
+            // Simulate sleeping which introduces a race which most of the time requires
+            // the ProcessBundleHandler to block.
+            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+            switch (stateRequestBuilder.getInstructionReference()) {
+              case "SUCCESS":
+                completableFuture.complete(StateResponse.getDefaultInstance());
+                break;
+              case "FAIL":
+                completableFuture.completeExceptionally(new RuntimeException("TEST ERROR"));
+            }
+          }
+        }.start();
+        return null;
+      }
+    }).when(mockBeamFnStateClient).handle(any(), any());
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient,
+        mockBeamFnStateGrpcClient,
+        ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
+          @Override
+          public Object createRunnerForPTransform(
+              PipelineOptions pipelineOptions,
+              BeamFnDataClient beamFnDataClient,
+              BeamFnStateClient beamFnStateClient,
+              String pTransformId,
+              RunnerApi.PTransform pTransform,
+              Supplier<String> processBundleInstructionId,
+              Map<String, RunnerApi.PCollection> pCollections,
+              Map<String, RunnerApi.Coder> coders,
+              Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+              Consumer<ThrowingRunnable> addStartFunction,
+              Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+            addStartFunction.accept(() -> doStateCalls(beamFnStateClient));
+            return null;
+          }
+
+          private void doStateCalls(BeamFnStateClient beamFnStateClient) {
+            beamFnStateClient.handle(StateRequest.newBuilder().setInstructionReference("SUCCESS"),
+                successfulResponse);
+            beamFnStateClient.handle(StateRequest.newBuilder().setInstructionReference("FAIL"),
+                unsuccessfulResponse);
+          }
+        }));
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
+            BeamFnApi.ProcessBundleRequest.newBuilder()
+                .setProcessBundleDescriptorReference("1L"))
+            .build());
+
+    assertTrue(successfulResponse.isDone());
+    assertTrue(unsuccessfulResponse.isDone());
+  }
+
+  @Test
+  public void testStateCallsFailIfNoStateApiServiceDescriptorSpecified() throws Exception {
+    BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
+        BeamFnApi.ProcessBundleDescriptor.newBuilder()
+            .putTransforms("2L", RunnerApi.PTransform.newBuilder()
+                .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build())
+                .build())
+            .build();
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", processBundleDescriptor);
+
+    ProcessBundleHandler handler = new ProcessBundleHandler(
+        PipelineOptionsFactory.create(),
+        fnApiRegistry::get,
+        beamFnDataClient,
+        null /* beamFnStateGrpcClientCache */,
+        ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
+          @Override
+          public Object createRunnerForPTransform(
+              PipelineOptions pipelineOptions,
+              BeamFnDataClient beamFnDataClient,
+              BeamFnStateClient beamFnStateClient,
+              String pTransformId,
+              RunnerApi.PTransform pTransform,
+              Supplier<String> processBundleInstructionId,
+              Map<String, RunnerApi.PCollection> pCollections,
+              Map<String, RunnerApi.Coder> coders,
+              Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+              Consumer<ThrowingRunnable> addStartFunction,
+              Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+            addStartFunction.accept(() -> doStateCalls(beamFnStateClient));
+            return null;
+          }
+
+          private void doStateCalls(BeamFnStateClient beamFnStateClient) {
+            thrown.expect(IllegalStateException.class);
+            thrown.expectMessage("State API calls are unsupported");
+            beamFnStateClient.handle(StateRequest.newBuilder().setInstructionReference("SUCCESS"),
+                new CompletableFuture<>());
+          }
+        }));
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
+            BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
+            .build());
+  }
+
+
   private static void throwException() {
     throw new IllegalStateException("TestException");
   }


[2/2] beam git commit: [BEAM-1347] Wire up the BeamFnStateGrpcClientCache implementation into the ProcessBundleHandler

Posted by lc...@apache.org.
[BEAM-1347] Wire up the BeamFnStateGrpcClientCache implementation into the ProcessBundleHandler

This closes #3795


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9653f27
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9653f27
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9653f27

Branch: refs/heads/master
Commit: c9653f27056d9d4dd85f7951f4edbd9da524b894
Parents: 5cb7be7 7f8c6e8
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 31 11:18:40 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Aug 31 11:18:40 2017 -0700

----------------------------------------------------------------------
 .../fn-api/src/main/proto/beam_fn_api.proto     |   5 +
 .../org/apache/beam/fn/harness/FnHarness.java   |   9 +-
 .../harness/control/ProcessBundleHandler.java   | 147 ++++++++++++++----
 .../control/ProcessBundleHandlerTest.java       | 153 ++++++++++++++++++-
 4 files changed, 279 insertions(+), 35 deletions(-)
----------------------------------------------------------------------