You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/30 23:11:00 UTC

[1/2] beam git commit: [BEAM-1347] Implement a BeamFnStateClient which communicates over gRPC.

Repository: beam
Updated Branches:
  refs/heads/master 585440d22 -> 1cd87e325


[BEAM-1347] Implement a BeamFnStateClient which communicates over gRPC.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb2d6b58
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb2d6b58
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb2d6b58

Branch: refs/heads/master
Commit: fb2d6b58c065604daedf02a492457ce35bacfde2
Parents: 585440d
Author: Luke Cwik <lc...@google.com>
Authored: Tue Aug 29 18:31:39 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 30 16:10:25 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/fn/harness/IdGenerator.java |  33 +++
 .../state/BeamFnStateGrpcClientCache.java       | 173 ++++++++++++++
 .../apache/beam/fn/harness/IdGeneratorTest.java |  40 ++++
 .../state/BeamFnStateGrpcClientCacheTest.java   | 234 +++++++++++++++++++
 4 files changed, 480 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fb2d6b58/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/IdGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/IdGenerator.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/IdGenerator.java
new file mode 100644
index 0000000..1112f43
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/IdGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An id generator.
+ *
+ * <p>This encapsulation exists to prevent usage of the wrong method on a shared {@link AtomicLong}.
+ */
+public final class IdGenerator {
+  private static final AtomicLong idGenerator = new AtomicLong(-1);
+
+  public static String generate() {
+    return Long.toString(idGenerator.getAndDecrement());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fb2d6b58/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
new file mode 100644
index 0000000..316e3e6
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+import org.apache.beam.fn.v1.BeamFnStateGrpc;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache of {@link BeamFnStateClient}s which handle Beam Fn State requests using gRPC.
+ *
+ * <p>TODO: Add the ability to close which cancels any pending and stops any future requests.
+ */
+public class BeamFnStateGrpcClientCache {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
+
+  private final ConcurrentMap<ApiServiceDescriptor, BeamFnStateClient> cache;
+  private final Function<ApiServiceDescriptor, ManagedChannel> channelFactory;
+  private final BiFunction<Function<StreamObserver<StateResponse>,
+      StreamObserver<StateRequest>>,
+      StreamObserver<StateResponse>,
+      StreamObserver<StateRequest>> streamObserverFactory;
+  private final PipelineOptions options;
+  private final Supplier<String> idGenerator;
+
+  public BeamFnStateGrpcClientCache(
+      PipelineOptions options,
+      Supplier<String> idGenerator,
+      Function<BeamFnApi.ApiServiceDescriptor, ManagedChannel> channelFactory,
+      BiFunction<Function<StreamObserver<StateResponse>, StreamObserver<StateRequest>>,
+          StreamObserver<StateResponse>,
+          StreamObserver<StateRequest>> streamObserverFactory) {
+    this.options = options;
+    this.idGenerator = idGenerator;
+    this.channelFactory = channelFactory;
+    this.streamObserverFactory = streamObserverFactory;
+    this.cache = new ConcurrentHashMap<>();
+  }
+
+  /**(
+   * Creates or returns an existing {@link BeamFnStateClient} depending on whether the passed in
+   * {@link ApiServiceDescriptor} currently has a {@link BeamFnStateClient} bound to the same
+   * channel.
+   */
+  public BeamFnStateClient forApiServiceDescriptor(ApiServiceDescriptor apiServiceDescriptor)
+      throws IOException {
+    return cache.computeIfAbsent(apiServiceDescriptor, this::createBeamFnStateClient);
+  }
+
+  private BeamFnStateClient createBeamFnStateClient(ApiServiceDescriptor apiServiceDescriptor) {
+    return new GrpcStateClient(apiServiceDescriptor);
+  }
+
+  /**
+   * A {@link BeamFnStateClient} for a given {@link ApiServiceDescriptor}.
+   */
+  private class GrpcStateClient implements BeamFnStateClient {
+    private final ApiServiceDescriptor apiServiceDescriptor;
+    private final ConcurrentMap<String, CompletableFuture<StateResponse>> outstandingRequests;
+    private final StreamObserver<StateRequest> outboundObserver;
+    private final ManagedChannel channel;
+    private volatile RuntimeException closed;
+
+    private GrpcStateClient(ApiServiceDescriptor apiServiceDescriptor) {
+      this.apiServiceDescriptor = apiServiceDescriptor;
+      this.outstandingRequests = new ConcurrentHashMap<>();
+      this.channel = channelFactory.apply(apiServiceDescriptor);
+      this.outboundObserver = streamObserverFactory.apply(
+          BeamFnStateGrpc.newStub(channel)::state, new InboundObserver());
+    }
+
+    @Override
+    public void handle(
+        StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+      requestBuilder.setId(idGenerator.get());
+      StateRequest request = requestBuilder.build();
+      outstandingRequests.put(request.getId(), response);
+
+      // If the server closes, gRPC will throw an error if onNext is called.
+      LOG.debug("Sending StateRequest {}", request);
+      outboundObserver.onNext(request);
+    }
+
+    private synchronized void closeAndCleanUp(RuntimeException cause) {
+      if (closed != null) {
+        return;
+      }
+      cache.remove(apiServiceDescriptor);
+      closed = cause;
+
+      // Make a copy of the map to make the view of the outstanding requests consistent.
+      Map<String, CompletableFuture<StateResponse>> outstandingRequestsCopy =
+          new ConcurrentHashMap<>(outstandingRequests);
+
+      if (outstandingRequestsCopy.isEmpty()) {
+        outboundObserver.onCompleted();
+        return;
+      }
+
+      outstandingRequests.clear();
+      LOG.error("BeamFnState failed, clearing outstanding requests {}", outstandingRequestsCopy);
+
+      for (CompletableFuture<StateResponse> entry : outstandingRequestsCopy.values()) {
+        entry.completeExceptionally(cause);
+      }
+    }
+
+    /**
+     * A {@link StreamObserver} which propagates any server side state request responses by
+     * completing the outstanding response future.
+     *
+     * <p>Also propagates server side failures and closes completing any outstanding requests
+     * exceptionally.
+     */
+    private class InboundObserver implements StreamObserver<StateResponse> {
+      @Override
+      public void onNext(StateResponse value) {
+        LOG.debug("Received StateResponse {}", value);
+        CompletableFuture<StateResponse> responseFuture = outstandingRequests.remove(value.getId());
+        if (responseFuture != null) {
+          if (value.getError().isEmpty()) {
+            responseFuture.complete(value);
+          } else {
+            responseFuture.completeExceptionally(new IllegalStateException(value.getError()));
+          }
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        closeAndCleanUp(t instanceof RuntimeException
+            ? (RuntimeException) t
+            : new RuntimeException(t));
+      }
+
+      @Override
+      public void onCompleted() {
+        closeAndCleanUp(new RuntimeException("Server hanged up."));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fb2d6b58/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/IdGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/IdGeneratorTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/IdGeneratorTest.java
new file mode 100644
index 0000000..10ce393
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/IdGeneratorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link IdGenerator}. */
+@RunWith(JUnit4.class)
+public class IdGeneratorTest {
+  @Test
+  public void testGenerationNeverMatches() {
+    final int numToGenerate = 10000;
+    Set<String> generatedValues = new HashSet<>();
+    for (int i = 0; i < numToGenerate; ++i) {
+      generatedValues.add(IdGenerator.generate());
+    }
+    assertEquals(numToGenerate, generatedValues.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fb2d6b58/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
new file mode 100644
index 0000000..f0e84c7
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCacheTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.CallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.IdGenerator;
+import org.apache.beam.fn.harness.test.TestStreams;
+import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+import org.apache.beam.fn.v1.BeamFnStateGrpc;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamFnStateGrpcClientCache}. */
+@RunWith(JUnit4.class)
+public class BeamFnStateGrpcClientCacheTest {
+  private static final String SUCCESS = "SUCCESS";
+  private static final String FAIL = "FAIL";
+  private static final String TEST_ERROR = "TEST ERROR";
+  private static final String SERVER_ERROR = "SERVER ERROR";
+
+  private ApiServiceDescriptor apiServiceDescriptor;
+  private ManagedChannel testChannel;
+  private Server testServer;
+  private BeamFnStateGrpcClientCache clientCache;
+  private BlockingQueue<StreamObserver<StateResponse>> outboundServerObservers;
+  private BlockingQueue<StateRequest> values;
+
+  @Before
+  public void setUp() throws Exception {
+    values = new LinkedBlockingQueue<>();
+    outboundServerObservers = new LinkedBlockingQueue<>();
+    CallStreamObserver<StateRequest> inboundServerObserver =
+        TestStreams.withOnNext(values::add).build();
+
+    apiServiceDescriptor =
+        ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+            .build();
+    testServer = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+        .addService(new BeamFnStateGrpc.BeamFnStateImplBase() {
+          @Override
+          public StreamObserver<StateRequest> state(
+              StreamObserver<StateResponse> outboundObserver) {
+            Uninterruptibles.putUninterruptibly(outboundServerObservers, outboundObserver);
+            return inboundServerObserver;
+          }
+        })
+        .build();
+    testServer.start();
+
+    testChannel = InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+    clientCache = new BeamFnStateGrpcClientCache(
+        PipelineOptionsFactory.create(),
+        IdGenerator::generate,
+        (ApiServiceDescriptor descriptor) -> testChannel,
+        this::createStreamForTest);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    testServer.shutdownNow();
+    testChannel.shutdownNow();
+  }
+
+  @Test
+  public void testCachingOfClient() throws Exception {
+    assertSame(clientCache.forApiServiceDescriptor(apiServiceDescriptor),
+        clientCache.forApiServiceDescriptor(apiServiceDescriptor));
+    assertNotSame(clientCache.forApiServiceDescriptor(apiServiceDescriptor),
+        clientCache.forApiServiceDescriptor(
+            ApiServiceDescriptor.newBuilder().setId("OTHER").build()));
+  }
+
+  @Test
+  public void testRequestResponses() throws Exception {
+    BeamFnStateClient client = clientCache.forApiServiceDescriptor(apiServiceDescriptor);
+
+    CompletableFuture<StateResponse> successfulResponse = new CompletableFuture<>();
+    CompletableFuture<StateResponse> unsuccessfulResponse = new CompletableFuture<>();
+
+    client.handle(
+        StateRequest.newBuilder().setInstructionReference(SUCCESS), successfulResponse);
+    client.handle(
+        StateRequest.newBuilder().setInstructionReference(FAIL), unsuccessfulResponse);
+
+    // Wait for the client to connect.
+    StreamObserver<StateResponse> outboundServerObserver = outboundServerObservers.take();
+    // Ensure the client doesn't break when sent garbage.
+    outboundServerObserver.onNext(StateResponse.newBuilder().setId("UNKNOWN ID").build());
+
+    // We expect to receive and handle two requests
+    handleServerRequest(outboundServerObserver, values.take());
+    handleServerRequest(outboundServerObserver, values.take());
+
+    // Ensure that the successful and unsuccessful responses were propagated.
+    assertNotNull(successfulResponse.get());
+    try {
+      unsuccessfulResponse.get();
+      fail("Expected unsuccessful response");
+    } catch (ExecutionException e) {
+      assertThat(e.toString(), containsString(TEST_ERROR));
+    }
+  }
+
+  @Test
+  public void testServerErrorCausesPendingAndFutureCallsToFail() throws Exception {
+    BeamFnStateClient client = clientCache.forApiServiceDescriptor(apiServiceDescriptor);
+
+    CompletableFuture<StateResponse> inflight = new CompletableFuture<>();
+    client.handle(StateRequest.newBuilder().setInstructionReference(SUCCESS), inflight);
+
+    // Wait for the client to connect.
+    StreamObserver<StateResponse> outboundServerObserver = outboundServerObservers.take();
+    // Send an error from the server.
+    outboundServerObserver.onError(
+        new StatusRuntimeException(Status.INTERNAL.withDescription(SERVER_ERROR)));
+
+    try {
+      inflight.get();
+      fail("Expected unsuccessful response due to server error");
+    } catch (ExecutionException e) {
+      assertThat(e.toString(), containsString(SERVER_ERROR));
+    }
+
+    // Send a response after the client will have received an error.
+    CompletableFuture<StateResponse> late = new CompletableFuture<>();
+    client.handle(StateRequest.newBuilder().setInstructionReference(SUCCESS), late);
+
+    try {
+      inflight.get();
+      fail("Expected unsuccessful response due to server error");
+    } catch (ExecutionException e) {
+      assertThat(e.toString(), containsString(SERVER_ERROR));
+    }
+  }
+
+  @Test
+  public void testServerCompletionCausesPendingAndFutureCallsToFail() throws Exception {
+    BeamFnStateClient client = clientCache.forApiServiceDescriptor(apiServiceDescriptor);
+
+    CompletableFuture<StateResponse> inflight = new CompletableFuture<>();
+    client.handle(StateRequest.newBuilder().setInstructionReference(SUCCESS), inflight);
+
+    // Wait for the client to connect.
+    StreamObserver<StateResponse> outboundServerObserver = outboundServerObservers.take();
+    // Send that the server is done.
+    outboundServerObserver.onCompleted();
+
+    try {
+      inflight.get();
+      fail("Expected unsuccessful response due to server completion");
+    } catch (ExecutionException e) {
+      assertThat(e.toString(), containsString("Server hanged up"));
+    }
+
+    // Send a response after the client will have received an error.
+    CompletableFuture<StateResponse> late = new CompletableFuture<>();
+    client.handle(StateRequest.newBuilder().setInstructionReference(SUCCESS), late);
+
+    try {
+      inflight.get();
+      fail("Expected unsuccessful response due to server completion");
+    } catch (ExecutionException e) {
+      assertThat(e.toString(), containsString("Server hanged up"));
+    }
+  }
+
+  private void handleServerRequest(
+      StreamObserver<StateResponse> outboundObserver, StateRequest value) {
+    switch (value.getInstructionReference()) {
+      case SUCCESS:
+        outboundObserver.onNext(StateResponse.newBuilder().setId(value.getId()).build());
+        return;
+      case FAIL:
+        outboundObserver.onNext(StateResponse.newBuilder()
+            .setId(value.getId())
+            .setError(TEST_ERROR)
+            .build());
+        return;
+      default:
+        outboundObserver.onNext(StateResponse.newBuilder().setId(value.getId()).build());
+        return;
+    }
+  }
+
+  private <ReqT, RespT> StreamObserver<RespT> createStreamForTest(
+      Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
+      StreamObserver<ReqT> handler) {
+    return clientFactory.apply(handler);
+  }
+}


[2/2] beam git commit: [BEAM-1347] Implement a BeamFnStateClient which communicates over gRPC.

Posted by lc...@apache.org.
[BEAM-1347] Implement a BeamFnStateClient which communicates over gRPC.

This closes #3788


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cd87e32
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cd87e32
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cd87e32

Branch: refs/heads/master
Commit: 1cd87e325798178c2176a9d090dbc5de7f9b46d2
Parents: 585440d fb2d6b5
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 30 16:10:52 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 30 16:10:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/fn/harness/IdGenerator.java |  33 +++
 .../state/BeamFnStateGrpcClientCache.java       | 173 ++++++++++++++
 .../apache/beam/fn/harness/IdGeneratorTest.java |  40 ++++
 .../state/BeamFnStateGrpcClientCacheTest.java   | 234 +++++++++++++++++++
 4 files changed, 480 insertions(+)
----------------------------------------------------------------------