You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/24 00:45:36 UTC

[1/2] beam git commit: [BEAM-1347] Provide an abstraction which creates an Iterator view over the Beam Fn State API

Repository: beam
Updated Branches:
  refs/heads/master d4db66dd6 -> c4517d04c


[BEAM-1347] Provide an abstraction which creates an Iterator view over the Beam Fn State API

Combining this with the DataStreams.DataStreamDecoder converts the Beam Fn State API into a
an input stream backed by multiple logical chunks.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b3f7e218
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b3f7e218
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b3f7e218

Branch: refs/heads/master
Commit: b3f7e2181ef32579646381573f9d147e0220d0d7
Parents: d4db66d
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 17 11:49:35 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 23 17:44:41 2017 -0700

----------------------------------------------------------------------
 .../fn/harness/state/BeamFnStateClient.java     |  16 ++-
 .../harness/state/StateFetchingIterators.java   | 126 +++++++++++++++++++
 .../beam/fn/harness/stream/DataStreams.java     |   2 +-
 .../state/StateFetchingIteratorsTest.java       |  99 +++++++++++++++
 4 files changed, 241 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
index 8150530..682adb9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
@@ -17,9 +17,23 @@
  */
 package org.apache.beam.fn.harness.state;
 
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+
 /**
- * TODO: Define interface required for handling state calls.
+ * The {@link BeamFnStateClient} is able to forward state requests to a handler which returns
+ * a corresponding response or error if completed unsuccessfully.
  */
 public interface BeamFnStateClient {
 
+  /**
+   * Consumes a state request populating a unique id returning a future to the response.
+   *
+   * @param requestBuilder A partially completed state request. The id will be populated the client.
+   * @param response A future containing a corresponding {@link StateResponse} for the supplied
+   * request.
+   */
+  void handle(BeamFnApi.StateRequest.Builder requestBuilder,
+      CompletableFuture<StateResponse> response);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
new file mode 100644
index 0000000..0526183
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
@@ -0,0 +1,126 @@
+package org.apache.beam.fn.harness.state;
+
+import com.google.common.base.Throwables;
+import com.google.protobuf.ByteString;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.beam.fn.v1.BeamFnApi.StateGetRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+
+/**
+ * Adapters which convert a a logical series of chunks using continuation tokens over the Beam
+ * Fn State API into an {@link Iterator} of {@link ByteString}s.
+ */
+public class StateFetchingIterators {
+
+  // do not instantiate
+  private StateFetchingIterators() {}
+
+  /**
+   * This adapter handles using the continuation token to provide iteration over all the chunks
+   * returned by the Beam Fn State API using the supplied state client and partially filled
+   * out state request containing a state key.
+   *
+   * @param beamFnStateClient A client for handling state requests.
+   * @param partialStateRequestBuilder A {@link StateRequest} with the
+   * {@link StateRequest#getStateKey()} already set.
+   * @return An {@code Iterator<ByteString>} representing all the requested data.
+   */
+  public static Iterator<ByteString> usingPartialRequestWithStateKey(
+      BeamFnStateClient beamFnStateClient,
+      Supplier<StateRequest.Builder> partialStateRequestBuilder) {
+    return new LazyBlockingStateFetchingIterator(beamFnStateClient, partialStateRequestBuilder);
+  }
+
+  /**
+   * An {@link Iterator} which fetches {@link ByteString} chunks using the State API.
+   *
+   * <p>This iterator will only request a chunk on first access. Also it does not eagerly
+   * pre-fetch any future chunks and blocks whenever required to fetch the next block.
+   */
+  static class LazyBlockingStateFetchingIterator implements Iterator<ByteString> {
+    private enum State { READ_REQUIRED, HAS_NEXT, EOF };
+    private final BeamFnStateClient beamFnStateClient;
+    /** Allows for the partially built state request to be memoized across many requests. */
+    private final Supplier<Builder> stateRequestSupplier;
+    private State currentState;
+    private ByteString continuationToken;
+    private ByteString next;
+
+    LazyBlockingStateFetchingIterator(
+        BeamFnStateClient beamFnStateClient,
+        Supplier<StateRequest.Builder> stateRequestSupplier) {
+      this.currentState = State.READ_REQUIRED;
+      this.beamFnStateClient = beamFnStateClient;
+      this.stateRequestSupplier = stateRequestSupplier;
+      this.continuationToken = ByteString.EMPTY;
+    }
+
+    @Override
+    public boolean hasNext() {
+      switch (currentState) {
+        case EOF:
+          return false;
+        case READ_REQUIRED:
+          CompletableFuture<StateResponse> stateResponseFuture = new CompletableFuture<>();
+          beamFnStateClient.handle(
+              stateRequestSupplier.get().setGet(
+                  StateGetRequest.newBuilder().setContinuationToken(continuationToken)),
+              stateResponseFuture);
+          StateResponse stateResponse;
+          try {
+            stateResponse = stateResponseFuture.get();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException(e);
+          } catch (ExecutionException e) {
+            if (e.getCause() == null) {
+              throw new IllegalStateException(e);
+            }
+            Throwables.throwIfUnchecked(e.getCause());
+            throw new IllegalStateException(e.getCause());
+          }
+          continuationToken = stateResponse.getGet().getContinuationToken();
+          next = stateResponse.getGet().getData();
+          currentState = State.HAS_NEXT;
+          return true;
+        case HAS_NEXT:
+          return true;
+      }
+      throw new IllegalStateException(String.format("Unknown state %s", currentState));
+    }
+
+    @Override
+    public ByteString next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      // If the continuation token is empty, that means we have reached EOF.
+      currentState = ByteString.EMPTY.equals(continuationToken) ? State.EOF : State.READ_REQUIRED;
+      return next;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
index 6967160..3ecd303 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
@@ -153,7 +153,7 @@ public class DataStreams {
           } catch (IOException e) {
             throw new IllegalStateException(e);
           }
-          // fall through expected
+          return true;
         case HAS_NEXT:
           return true;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
new file mode 100644
index 0000000..67e36e1
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import com.google.common.collect.Iterators;
+import com.google.protobuf.ByteString;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.state.StateFetchingIterators.LazyBlockingStateFetchingIterator;
+import org.apache.beam.fn.v1.BeamFnApi.StateGetResponse;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link StateFetchingIterators}. */
+@RunWith(Enclosed.class)
+public class StateFetchingIteratorsTest {
+  /** Tests for {@link StateFetchingIterators.LazyBlockingStateFetchingIterator}. */
+  @RunWith(JUnit4.class)
+  public static class LazyBlockingStateFetchingIteratorTest {
+
+    @Test
+    public void testEmpty() throws Exception {
+      testFetch(ByteString.EMPTY);
+    }
+
+    @Test
+    public void testNonEmpty() throws Exception {
+      testFetch(ByteString.copyFromUtf8("A"));
+    }
+
+    @Test
+    public void testWithLastByteStringBeingEmpty() throws Exception {
+      testFetch(ByteString.copyFromUtf8("A"), ByteString.EMPTY);
+    }
+
+    @Test
+    public void testMulti() throws Exception {
+      testFetch(ByteString.copyFromUtf8("BC"), ByteString.copyFromUtf8("DEF"));
+    }
+
+    @Test
+    public void testMultiWithEmptyByteStrings() throws Exception {
+      testFetch(ByteString.EMPTY, ByteString.copyFromUtf8("BC"), ByteString.EMPTY,
+          ByteString.EMPTY, ByteString.copyFromUtf8("DEF"), ByteString.EMPTY);
+    }
+
+    private void testFetch(ByteString... expected) {
+      BeamFnStateClient fakeStateClient = new BeamFnStateClient() {
+        @Override
+        public void handle(
+            StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+          ByteString continuationToken = requestBuilder.getGet().getContinuationToken();
+          StateGetResponse.Builder builder = StateGetResponse.newBuilder();
+
+          int requestedPosition = 0; // Default position is 0
+          if (!ByteString.EMPTY.equals(continuationToken)) {
+            requestedPosition = Integer.parseInt(continuationToken.toStringUtf8());
+          }
+
+          // Compute the new continuation token
+          ByteString newContinuationToken = ByteString.EMPTY;
+          if (requestedPosition != expected.length - 1) {
+            newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
+          }
+          response.complete(StateResponse.newBuilder()
+              .setId(requestBuilder.getId())
+              .setGet(StateGetResponse.newBuilder()
+                  .setData(expected[requestedPosition])
+                  .setContinuationToken(newContinuationToken))
+              .build());
+        }
+      };
+      Iterator<ByteString> byteStrings =
+          new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest::newBuilder);
+      assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class));
+    }
+  }
+}


[2/2] beam git commit: [BEAM-1347] Provide an abstraction which creates an Iterator view over the Beam Fn State API

Posted by lc...@apache.org.
[BEAM-1347] Provide an abstraction which creates an Iterator view over the Beam Fn State API

This closes #3736


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c4517d04
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c4517d04
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c4517d04

Branch: refs/heads/master
Commit: c4517d04c564613ed7dbda627292d239f370509c
Parents: d4db66d b3f7e21
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 23 17:45:28 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 23 17:45:28 2017 -0700

----------------------------------------------------------------------
 .../fn/harness/state/BeamFnStateClient.java     |  16 ++-
 .../harness/state/StateFetchingIterators.java   | 126 +++++++++++++++++++
 .../beam/fn/harness/stream/DataStreams.java     |   2 +-
 .../state/StateFetchingIteratorsTest.java       |  99 +++++++++++++++
 4 files changed, 241 insertions(+), 2 deletions(-)
----------------------------------------------------------------------