You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/28 16:54:45 UTC
[1/2] beam git commit: [BEAM-1347] Add a BagUserState implementation
over the BeamFnStateClient
Repository: beam
Updated Branches:
refs/heads/master f634aecbb -> ba5c4071d
[BEAM-1347] Add a BagUserState implementation over the BeamFnStateClient
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d36a261
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d36a261
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d36a261
Branch: refs/heads/master
Commit: 8d36a261d4e8c6569e9036a27d45c00daccd908b
Parents: 20d88db
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 24 18:34:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 25 08:53:27 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/state/BagUserState.java | 121 +++++++++++++++++++
.../state/LazyCachingIteratorToIterable.java | 72 +++++++++++
.../beam/fn/harness/state/BagUserStateTest.java | 106 ++++++++++++++++
.../fn/harness/state/FakeBeamFnStateClient.java | 110 +++++++++++++++++
.../LazyCachingIteratorToIterableTest.java | 76 ++++++++++++
5 files changed, 485 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
new file mode 100644
index 0000000..2d7f0c8
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.stream.DataStreams;
+import org.apache.beam.fn.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache
+ * memory pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class BagUserState<T> {
+ private final BeamFnStateClient beamFnStateClient;
+ private final String stateId;
+ private final Coder<T> coder;
+ private final Supplier<Builder> partialRequestSupplier;
+ private Iterable<T> oldValues;
+ private ArrayList<T> newValues;
+ private List<T> unmodifiableNewValues;
+ private boolean isClosed;
+
+ public BagUserState(
+ BeamFnStateClient beamFnStateClient,
+ String stateId,
+ Coder<T> coder,
+ Supplier<Builder> partialRequestSupplier) {
+ this.beamFnStateClient = beamFnStateClient;
+ this.stateId = stateId;
+ this.coder = coder;
+ this.partialRequestSupplier = partialRequestSupplier;
+ this.oldValues = new LazyCachingIteratorToIterable<>(
+ new DataStreams.DataStreamDecoder(coder,
+ DataStreams.inbound(
+ StateFetchingIterators.usingPartialRequestWithStateKey(
+ beamFnStateClient,
+ partialRequestSupplier))));
+ this.newValues = new ArrayList<>();
+ this.unmodifiableNewValues = Collections.unmodifiableList(newValues);
+ }
+
+ public Iterable<T> get() {
+ checkState(!isClosed,
+ "Bag user state is no longer usable because it is closed for %s", stateId);
+ // If we were cleared we should disregard old values.
+ if (oldValues == null) {
+ return unmodifiableNewValues;
+ }
+ return Iterables.concat(oldValues, unmodifiableNewValues);
+ }
+
+ public void append(T t) {
+ checkState(!isClosed,
+ "Bag user state is no longer usable because it is closed for %s", stateId);
+ newValues.add(t);
+ }
+
+ public void clear() {
+ checkState(!isClosed,
+ "Bag user state is no longer usable because it is closed for %s", stateId);
+ oldValues = null;
+ newValues.clear();
+ }
+
+ public void asyncClose() throws Exception {
+ checkState(!isClosed,
+ "Bag user state is no longer usable because it is closed for %s", stateId);
+ if (oldValues == null) {
+ beamFnStateClient.handle(
+ partialRequestSupplier.get()
+ .setClear(StateClearRequest.getDefaultInstance()),
+ new CompletableFuture<>());
+ }
+ if (!newValues.isEmpty()) {
+ ByteString.Output out = ByteString.newOutput();
+ for (T newValue : newValues) {
+ // TODO: Replace with chunking output stream
+ coder.encode(newValue, out);
+ }
+ beamFnStateClient.handle(
+ partialRequestSupplier.get()
+ .setAppend(StateAppendRequest.newBuilder().setData(out.toByteString())),
+ new CompletableFuture<>());
+ }
+ isClosed = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java
new file mode 100644
index 0000000..0a43317
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Converts an iterator to an iterable lazily loading values from the underlying iterator
+ * and caching them to support reiteration.
+ */
+class LazyCachingIteratorToIterable<T> implements Iterable<T> {
+ private final List<T> cachedElements;
+ private final Iterator<T> iterator;
+
+ public LazyCachingIteratorToIterable(Iterator<T> iterator) {
+ this.cachedElements = new ArrayList<>();
+ this.iterator = iterator;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new CachingIterator();
+ }
+
+ /** An {@link Iterator} which adds and fetched values into the cached elements list. */
+ private class CachingIterator implements Iterator<T> {
+ private int position = 0;
+
+ private CachingIterator() {
+ }
+
+ @Override
+ public boolean hasNext() {
+ // The order of the short circuit is important below.
+ return position < cachedElements.size() || iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ if (position < cachedElements.size()) {
+ return cachedElements.get(position++);
+ }
+
+ if (!iterator.hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ T rval = iterator.next();
+ cachedElements.add(rval);
+ position += 1;
+ return rval;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
new file mode 100644
index 0000000..f3c76ac
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import org.apache.beam.fn.v1.BeamFnApi.StateKey;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BagUserState}. */
+@RunWith(JUnit4.class)
+public class BagUserStateTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testGet() throws Exception {
+ FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of(
+ key("A"), encode("A1", "A2", "A3")));
+ BagUserState<String> userState =
+ new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A"));
+ assertArrayEquals(new String[]{ "A1", "A2", "A3" },
+ Iterables.toArray(userState.get(), String.class));
+
+ userState.asyncClose();
+ thrown.expect(IllegalStateException.class);
+ userState.get();
+ }
+
+ @Test
+ public void testAppend() throws Exception {
+ FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of(
+ key("A"), encode("A1")));
+ BagUserState<String> userState =
+ new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A"));
+ userState.append("A2");
+ userState.append("A3");
+ userState.asyncClose();
+
+ assertEquals(encode("A1", "A2", "A3"), fakeClient.getData().get(key("A")));
+ thrown.expect(IllegalStateException.class);
+ userState.append("A4");
+ }
+
+ @Test
+ public void testClear() throws Exception {
+ FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of(
+ key("A"), encode("A1", "A2", "A3")));
+ BagUserState<String> userState =
+ new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A"));
+
+ userState.clear();
+ userState.append("A1");
+ userState.clear();
+ userState.asyncClose();
+
+ assertNull(fakeClient.getData().get(key("A")));
+ thrown.expect(IllegalStateException.class);
+ userState.clear();
+ }
+
+ private StateRequest.Builder requestForId(String id) {
+ return StateRequest.newBuilder().setStateKey(
+ StateKey.newBuilder().setBagUserState(
+ StateKey.BagUserState.newBuilder().setKey(ByteString.copyFromUtf8(id))));
+ }
+
+ private StateKey key(String id) {
+ return StateKey.newBuilder().setBagUserState(
+ StateKey.BagUserState.newBuilder().setKey(ByteString.copyFromUtf8(id))).build();
+ }
+
+ private ByteString encode(String ... values) throws IOException {
+ ByteString.Output out = ByteString.newOutput();
+ for (String value : values) {
+ StringUtf8Coder.of().encode(value, out);
+ }
+ return out.toByteString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
new file mode 100644
index 0000000..d260207
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import com.google.protobuf.ByteString;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.fn.v1.BeamFnApi.StateAppendResponse;
+import org.apache.beam.fn.v1.BeamFnApi.StateClearResponse;
+import org.apache.beam.fn.v1.BeamFnApi.StateGetResponse;
+import org.apache.beam.fn.v1.BeamFnApi.StateKey;
+import org.apache.beam.fn.v1.BeamFnApi.StateKey.TypeCase;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest.RequestCase;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+
+/** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */
+public class FakeBeamFnStateClient implements BeamFnStateClient {
+ private final Map<StateKey, ByteString> data;
+ private int currentId;
+
+ public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
+ this.data = new ConcurrentHashMap<>(initialData);
+ }
+
+ public Map<StateKey, ByteString> getData() {
+ return Collections.unmodifiableMap(data);
+ }
+
+ @Override
+ public void handle(StateRequest.Builder requestBuilder,
+ CompletableFuture<StateResponse> responseFuture) {
+ // The id should never be filled out
+ assertEquals("", requestBuilder.getId());
+ requestBuilder.setId(generateId());
+
+ StateRequest request = requestBuilder.build();
+ StateKey key = request.getStateKey();
+ StateResponse.Builder response;
+
+ assertNotEquals(RequestCase.REQUEST_NOT_SET, request.getRequestCase());
+ assertNotEquals(TypeCase.TYPE_NOT_SET, key.getTypeCase());
+ // multimap side input and runner based state keys only support get requests
+ if (key.getTypeCase() == TypeCase.MULTIMAP_SIDE_INPUT
+ || key.getTypeCase() == TypeCase.RUNNER) {
+ assertEquals(RequestCase.GET, request.getRequestCase());
+ }
+
+ switch (request.getRequestCase()) {
+ case GET:
+ // Chunk gets into 5 byte return blocks
+ ByteString byteString = data.get(request.getStateKey());
+ int block = 0;
+ if (request.getGet().getContinuationToken().size() > 0) {
+ block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
+ }
+ ByteString returnBlock = byteString.substring(
+ block * 5, Math.min(byteString.size(), (block + 1) * 5));
+ ByteString continuationToken = ByteString.EMPTY;
+ if (byteString.size() > (block + 1) * 5) {
+ continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1));
+ }
+ response = StateResponse.newBuilder().setGet(StateGetResponse.newBuilder()
+ .setData(returnBlock)
+ .setContinuationToken(continuationToken));
+ break;
+
+ case CLEAR:
+ data.remove(request.getStateKey());
+ response = StateResponse.newBuilder().setClear(StateClearResponse.getDefaultInstance());
+ break;
+
+ case APPEND:
+ ByteString previousValue = data.getOrDefault(request.getStateKey(), ByteString.EMPTY);
+ data.put(request.getStateKey(), previousValue.concat(request.getAppend().getData()));
+ response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
+ break;
+
+ default:
+ throw new IllegalStateException(
+ String.format("Unknown request type %s", request.getRequestCase()));
+ }
+
+ responseFuture.complete(response.setId(requestBuilder.getId()).build());
+ }
+
+ private String generateId() {
+ return Integer.toString(++currentId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java
new file mode 100644
index 0000000..53eefb4
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link LazyCachingIteratorToIterable}. */
+@RunWith(JUnit4.class)
+public class LazyCachingIteratorToIterableTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testEmptyIterator() {
+ Iterable<Object> iterable = new LazyCachingIteratorToIterable<>(Iterators.forArray());
+ assertArrayEquals(new Object[0], Iterables.toArray(iterable, Object.class));
+ // iterate multiple times
+ assertArrayEquals(new Object[0], Iterables.toArray(iterable, Object.class));
+
+ thrown.expect(NoSuchElementException.class);
+ iterable.iterator().next();
+ }
+
+ @Test
+ public void testInterleavedIteration() {
+ Iterable<String> iterable =
+ new LazyCachingIteratorToIterable<>(Iterators.forArray("A", "B", "C"));
+
+ Iterator<String> iterator1 = iterable.iterator();
+ assertTrue(iterator1.hasNext());
+ assertEquals("A", iterator1.next());
+ Iterator<String> iterator2 = iterable.iterator();
+ assertTrue(iterator2.hasNext());
+ assertEquals("A", iterator2.next());
+ assertTrue(iterator2.hasNext());
+ assertEquals("B", iterator2.next());
+ assertTrue(iterator1.hasNext());
+ assertEquals("B", iterator1.next());
+ assertTrue(iterator1.hasNext());
+ assertEquals("C", iterator1.next());
+ assertFalse(iterator1.hasNext());
+ assertTrue(iterator2.hasNext());
+ assertEquals("C", iterator2.next());
+ assertFalse(iterator2.hasNext());
+
+ thrown.expect(NoSuchElementException.class);
+ iterator1.next();
+ }
+}
[2/2] beam git commit: [BEAM-1347] Add a BagUserState implementation
over the BeamFnStateClient
Posted by lc...@apache.org.
[BEAM-1347] Add a BagUserState implementation over the BeamFnStateClient
This closes #3760
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ba5c4071
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ba5c4071
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ba5c4071
Branch: refs/heads/master
Commit: ba5c4071dfa1831262bc01aa33f06ac0e5561484
Parents: f634aec 8d36a26
Author: Luke Cwik <lc...@google.com>
Authored: Mon Aug 28 09:54:38 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Aug 28 09:54:38 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/state/BagUserState.java | 121 +++++++++++++++++++
.../state/LazyCachingIteratorToIterable.java | 72 +++++++++++
.../beam/fn/harness/state/BagUserStateTest.java | 106 ++++++++++++++++
.../fn/harness/state/FakeBeamFnStateClient.java | 110 +++++++++++++++++
.../LazyCachingIteratorToIterableTest.java | 76 ++++++++++++
5 files changed, 485 insertions(+)
----------------------------------------------------------------------