You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/24 00:45:36 UTC
[1/2] beam git commit: [BEAM-1347] Provide an abstraction which
creates an Iterator view over the Beam Fn State API
Repository: beam
Updated Branches:
refs/heads/master d4db66dd6 -> c4517d04c
[BEAM-1347] Provide an abstraction which creates an Iterator view over the Beam Fn State API
Combining this with the DataStreams.DataStreamDecoder converts the Beam Fn State API into a
an input stream backed by multiple logical chunks.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b3f7e218
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b3f7e218
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b3f7e218
Branch: refs/heads/master
Commit: b3f7e2181ef32579646381573f9d147e0220d0d7
Parents: d4db66d
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 17 11:49:35 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 23 17:44:41 2017 -0700
----------------------------------------------------------------------
.../fn/harness/state/BeamFnStateClient.java | 16 ++-
.../harness/state/StateFetchingIterators.java | 126 +++++++++++++++++++
.../beam/fn/harness/stream/DataStreams.java | 2 +-
.../state/StateFetchingIteratorsTest.java | 99 +++++++++++++++
4 files changed, 241 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
index 8150530..682adb9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
@@ -17,9 +17,23 @@
*/
package org.apache.beam.fn.harness.state;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+
/**
- * TODO: Define interface required for handling state calls.
+ * The {@link BeamFnStateClient} is able to forward state requests to a handler which returns
+ * a corresponding response or error if completed unsuccessfully.
*/
public interface BeamFnStateClient {
+ /**
+ * Consumes a state request populating a unique id returning a future to the response.
+ *
+ * @param requestBuilder A partially completed state request. The id will be populated the client.
+ * @param response A future containing a corresponding {@link StateResponse} for the supplied
+ * request.
+ */
+ void handle(BeamFnApi.StateRequest.Builder requestBuilder,
+ CompletableFuture<StateResponse> response);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
new file mode 100644
index 0000000..0526183
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
@@ -0,0 +1,126 @@
+package org.apache.beam.fn.harness.state;
+
+import com.google.common.base.Throwables;
+import com.google.protobuf.ByteString;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.beam.fn.v1.BeamFnApi.StateGetRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+
+/**
+ * Adapters which convert a a logical series of chunks using continuation tokens over the Beam
+ * Fn State API into an {@link Iterator} of {@link ByteString}s.
+ */
+public class StateFetchingIterators {
+
+ // do not instantiate
+ private StateFetchingIterators() {}
+
+ /**
+ * This adapter handles using the continuation token to provide iteration over all the chunks
+ * returned by the Beam Fn State API using the supplied state client and partially filled
+ * out state request containing a state key.
+ *
+ * @param beamFnStateClient A client for handling state requests.
+ * @param partialStateRequestBuilder A {@link StateRequest} with the
+ * {@link StateRequest#getStateKey()} already set.
+ * @return An {@code Iterator<ByteString>} representing all the requested data.
+ */
+ public static Iterator<ByteString> usingPartialRequestWithStateKey(
+ BeamFnStateClient beamFnStateClient,
+ Supplier<StateRequest.Builder> partialStateRequestBuilder) {
+ return new LazyBlockingStateFetchingIterator(beamFnStateClient, partialStateRequestBuilder);
+ }
+
+ /**
+ * An {@link Iterator} which fetches {@link ByteString} chunks using the State API.
+ *
+ * <p>This iterator will only request a chunk on first access. Also it does not eagerly
+ * pre-fetch any future chunks and blocks whenever required to fetch the next block.
+ */
+ static class LazyBlockingStateFetchingIterator implements Iterator<ByteString> {
+ private enum State { READ_REQUIRED, HAS_NEXT, EOF };
+ private final BeamFnStateClient beamFnStateClient;
+ /** Allows for the partially built state request to be memoized across many requests. */
+ private final Supplier<Builder> stateRequestSupplier;
+ private State currentState;
+ private ByteString continuationToken;
+ private ByteString next;
+
+ LazyBlockingStateFetchingIterator(
+ BeamFnStateClient beamFnStateClient,
+ Supplier<StateRequest.Builder> stateRequestSupplier) {
+ this.currentState = State.READ_REQUIRED;
+ this.beamFnStateClient = beamFnStateClient;
+ this.stateRequestSupplier = stateRequestSupplier;
+ this.continuationToken = ByteString.EMPTY;
+ }
+
+ @Override
+ public boolean hasNext() {
+ switch (currentState) {
+ case EOF:
+ return false;
+ case READ_REQUIRED:
+ CompletableFuture<StateResponse> stateResponseFuture = new CompletableFuture<>();
+ beamFnStateClient.handle(
+ stateRequestSupplier.get().setGet(
+ StateGetRequest.newBuilder().setContinuationToken(continuationToken)),
+ stateResponseFuture);
+ StateResponse stateResponse;
+ try {
+ stateResponse = stateResponseFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() == null) {
+ throw new IllegalStateException(e);
+ }
+ Throwables.throwIfUnchecked(e.getCause());
+ throw new IllegalStateException(e.getCause());
+ }
+ continuationToken = stateResponse.getGet().getContinuationToken();
+ next = stateResponse.getGet().getData();
+ currentState = State.HAS_NEXT;
+ return true;
+ case HAS_NEXT:
+ return true;
+ }
+ throw new IllegalStateException(String.format("Unknown state %s", currentState));
+ }
+
+ @Override
+ public ByteString next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // If the continuation token is empty, that means we have reached EOF.
+ currentState = ByteString.EMPTY.equals(continuationToken) ? State.EOF : State.READ_REQUIRED;
+ return next;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
index 6967160..3ecd303 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/stream/DataStreams.java
@@ -153,7 +153,7 @@ public class DataStreams {
} catch (IOException e) {
throw new IllegalStateException(e);
}
- // fall through expected
+ return true;
case HAS_NEXT:
return true;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b3f7e218/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
new file mode 100644
index 0000000..67e36e1
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import com.google.common.collect.Iterators;
+import com.google.protobuf.ByteString;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.state.StateFetchingIterators.LazyBlockingStateFetchingIterator;
+import org.apache.beam.fn.v1.BeamFnApi.StateGetResponse;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link StateFetchingIterators}. */
+@RunWith(Enclosed.class)
+public class StateFetchingIteratorsTest {
+ /** Tests for {@link StateFetchingIterators.LazyBlockingStateFetchingIterator}. */
+ @RunWith(JUnit4.class)
+ public static class LazyBlockingStateFetchingIteratorTest {
+
+ @Test
+ public void testEmpty() throws Exception {
+ testFetch(ByteString.EMPTY);
+ }
+
+ @Test
+ public void testNonEmpty() throws Exception {
+ testFetch(ByteString.copyFromUtf8("A"));
+ }
+
+ @Test
+ public void testWithLastByteStringBeingEmpty() throws Exception {
+ testFetch(ByteString.copyFromUtf8("A"), ByteString.EMPTY);
+ }
+
+ @Test
+ public void testMulti() throws Exception {
+ testFetch(ByteString.copyFromUtf8("BC"), ByteString.copyFromUtf8("DEF"));
+ }
+
+ @Test
+ public void testMultiWithEmptyByteStrings() throws Exception {
+ testFetch(ByteString.EMPTY, ByteString.copyFromUtf8("BC"), ByteString.EMPTY,
+ ByteString.EMPTY, ByteString.copyFromUtf8("DEF"), ByteString.EMPTY);
+ }
+
+ private void testFetch(ByteString... expected) {
+ BeamFnStateClient fakeStateClient = new BeamFnStateClient() {
+ @Override
+ public void handle(
+ StateRequest.Builder requestBuilder, CompletableFuture<StateResponse> response) {
+ ByteString continuationToken = requestBuilder.getGet().getContinuationToken();
+ StateGetResponse.Builder builder = StateGetResponse.newBuilder();
+
+ int requestedPosition = 0; // Default position is 0
+ if (!ByteString.EMPTY.equals(continuationToken)) {
+ requestedPosition = Integer.parseInt(continuationToken.toStringUtf8());
+ }
+
+ // Compute the new continuation token
+ ByteString newContinuationToken = ByteString.EMPTY;
+ if (requestedPosition != expected.length - 1) {
+ newContinuationToken = ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
+ }
+ response.complete(StateResponse.newBuilder()
+ .setId(requestBuilder.getId())
+ .setGet(StateGetResponse.newBuilder()
+ .setData(expected[requestedPosition])
+ .setContinuationToken(newContinuationToken))
+ .build());
+ }
+ };
+ Iterator<ByteString> byteStrings =
+ new LazyBlockingStateFetchingIterator(fakeStateClient, StateRequest::newBuilder);
+ assertArrayEquals(expected, Iterators.toArray(byteStrings, Object.class));
+ }
+ }
+}
[2/2] beam git commit: [BEAM-1347] Provide an abstraction which
creates an Iterator view over the Beam Fn State API
Posted by lc...@apache.org.
[BEAM-1347] Provide an abstraction which creates an Iterator view over the Beam Fn State API
This closes #3736
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c4517d04
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c4517d04
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c4517d04
Branch: refs/heads/master
Commit: c4517d04c564613ed7dbda627292d239f370509c
Parents: d4db66d b3f7e21
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 23 17:45:28 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 23 17:45:28 2017 -0700
----------------------------------------------------------------------
.../fn/harness/state/BeamFnStateClient.java | 16 ++-
.../harness/state/StateFetchingIterators.java | 126 +++++++++++++++++++
.../beam/fn/harness/stream/DataStreams.java | 2 +-
.../state/StateFetchingIteratorsTest.java | 99 +++++++++++++++
4 files changed, 241 insertions(+), 2 deletions(-)
----------------------------------------------------------------------