You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/28 16:54:45 UTC

[1/2] beam git commit: [BEAM-1347] Add a BagUserState implementation over the BeamFnStateClient

Repository: beam
Updated Branches:
  refs/heads/master f634aecbb -> ba5c4071d


[BEAM-1347] Add a BagUserState implementation over the BeamFnStateClient


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d36a261
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d36a261
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d36a261

Branch: refs/heads/master
Commit: 8d36a261d4e8c6569e9036a27d45c00daccd908b
Parents: 20d88db
Author: Luke Cwik <lc...@google.com>
Authored: Thu Aug 24 18:34:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 25 08:53:27 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/state/BagUserState.java     | 121 +++++++++++++++++++
 .../state/LazyCachingIteratorToIterable.java    |  72 +++++++++++
 .../beam/fn/harness/state/BagUserStateTest.java | 106 ++++++++++++++++
 .../fn/harness/state/FakeBeamFnStateClient.java | 110 +++++++++++++++++
 .../LazyCachingIteratorToIterableTest.java      |  76 ++++++++++++
 5 files changed, 485 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
new file mode 100644
index 0000000..2d7f0c8
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.beam.fn.harness.stream.DataStreams;
+import org.apache.beam.fn.v1.BeamFnApi.StateAppendRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear
+ * and persist values.
+ *
+ * <p>Calling {@link #asyncClose()} schedules any required persistence changes. This object should
+ * no longer be used after it is closed.
+ *
+ * <p>TODO: Move to an async persist model where persistence is signalled based upon cache
+ * memory pressure and its need to flush.
+ *
+ * <p>TODO: Support block level caching and prefetch.
+ */
+public class BagUserState<T> {
+  private final BeamFnStateClient beamFnStateClient;
+  private final String stateId;
+  private final Coder<T> coder;
+  private final Supplier<Builder> partialRequestSupplier;
+  private Iterable<T> oldValues;
+  private ArrayList<T> newValues;
+  private List<T> unmodifiableNewValues;
+  private boolean isClosed;
+
+  public BagUserState(
+      BeamFnStateClient beamFnStateClient,
+      String stateId,
+      Coder<T> coder,
+      Supplier<Builder> partialRequestSupplier) {
+    this.beamFnStateClient = beamFnStateClient;
+    this.stateId = stateId;
+    this.coder = coder;
+    this.partialRequestSupplier = partialRequestSupplier;
+    this.oldValues = new LazyCachingIteratorToIterable<>(
+        new DataStreams.DataStreamDecoder(coder,
+            DataStreams.inbound(
+                StateFetchingIterators.usingPartialRequestWithStateKey(
+                    beamFnStateClient,
+                    partialRequestSupplier))));
+    this.newValues = new ArrayList<>();
+    this.unmodifiableNewValues = Collections.unmodifiableList(newValues);
+  }
+
+  public Iterable<T> get() {
+    checkState(!isClosed,
+        "Bag user state is no longer usable because it is closed for %s", stateId);
+    // If we were cleared we should disregard old values.
+    if (oldValues == null) {
+      return unmodifiableNewValues;
+    }
+    return Iterables.concat(oldValues, unmodifiableNewValues);
+  }
+
+  public void append(T t) {
+    checkState(!isClosed,
+        "Bag user state is no longer usable because it is closed for %s", stateId);
+    newValues.add(t);
+  }
+
+  public void clear() {
+    checkState(!isClosed,
+        "Bag user state is no longer usable because it is closed for %s", stateId);
+    oldValues = null;
+    newValues.clear();
+  }
+
+  public void asyncClose() throws Exception {
+    checkState(!isClosed,
+        "Bag user state is no longer usable because it is closed for %s", stateId);
+    if (oldValues == null) {
+      beamFnStateClient.handle(
+          partialRequestSupplier.get()
+              .setClear(StateClearRequest.getDefaultInstance()),
+          new CompletableFuture<>());
+    }
+    if (!newValues.isEmpty()) {
+      ByteString.Output out = ByteString.newOutput();
+      for (T newValue : newValues) {
+        // TODO: Replace with chunking output stream
+        coder.encode(newValue, out);
+      }
+      beamFnStateClient.handle(
+          partialRequestSupplier.get()
+              .setAppend(StateAppendRequest.newBuilder().setData(out.toByteString())),
+          new CompletableFuture<>());
+    }
+    isClosed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java
new file mode 100644
index 0000000..0a43317
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Converts an iterator to an iterable lazily loading values from the underlying iterator
+ * and caching them to support reiteration.
+ */
+class LazyCachingIteratorToIterable<T> implements Iterable<T> {
+  private final List<T> cachedElements;
+  private final Iterator<T> iterator;
+
+  public LazyCachingIteratorToIterable(Iterator<T> iterator) {
+    this.cachedElements = new ArrayList<>();
+    this.iterator = iterator;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return new CachingIterator();
+  }
+
+  /** An {@link Iterator} which adds and fetched values into the cached elements list. */
+  private class CachingIterator implements Iterator<T> {
+    private int position = 0;
+
+    private CachingIterator() {
+    }
+
+    @Override
+    public boolean hasNext() {
+      // The order of the short circuit is important below.
+      return position < cachedElements.size() || iterator.hasNext();
+    }
+
+    @Override
+    public T next() {
+      if (position < cachedElements.size()) {
+        return cachedElements.get(position++);
+      }
+
+      if (!iterator.hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      T rval = iterator.next();
+      cachedElements.add(rval);
+      position += 1;
+      return rval;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
new file mode 100644
index 0000000..f3c76ac
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import org.apache.beam.fn.v1.BeamFnApi.StateKey;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BagUserState}. */
+@RunWith(JUnit4.class)
+public class BagUserStateTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testGet() throws Exception {
+    FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of(
+        key("A"), encode("A1", "A2", "A3")));
+    BagUserState<String> userState =
+        new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A"));
+    assertArrayEquals(new String[]{ "A1", "A2", "A3" },
+        Iterables.toArray(userState.get(), String.class));
+
+    userState.asyncClose();
+    thrown.expect(IllegalStateException.class);
+    userState.get();
+  }
+
+  @Test
+  public void testAppend() throws Exception {
+    FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of(
+        key("A"), encode("A1")));
+    BagUserState<String> userState =
+        new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A"));
+    userState.append("A2");
+    userState.append("A3");
+    userState.asyncClose();
+
+    assertEquals(encode("A1", "A2", "A3"), fakeClient.getData().get(key("A")));
+    thrown.expect(IllegalStateException.class);
+    userState.append("A4");
+  }
+
+  @Test
+  public void testClear() throws Exception {
+    FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(ImmutableMap.of(
+        key("A"), encode("A1", "A2", "A3")));
+    BagUserState<String> userState =
+        new BagUserState<>(fakeClient, "A", StringUtf8Coder.of(), () -> requestForId("A"));
+
+    userState.clear();
+    userState.append("A1");
+    userState.clear();
+    userState.asyncClose();
+
+    assertNull(fakeClient.getData().get(key("A")));
+    thrown.expect(IllegalStateException.class);
+    userState.clear();
+  }
+
+  private StateRequest.Builder requestForId(String id) {
+    return StateRequest.newBuilder().setStateKey(
+        StateKey.newBuilder().setBagUserState(
+            StateKey.BagUserState.newBuilder().setKey(ByteString.copyFromUtf8(id))));
+  }
+
+  private StateKey key(String id) {
+    return StateKey.newBuilder().setBagUserState(
+        StateKey.BagUserState.newBuilder().setKey(ByteString.copyFromUtf8(id))).build();
+  }
+
+  private ByteString encode(String ... values) throws IOException {
+    ByteString.Output out = ByteString.newOutput();
+    for (String value : values) {
+      StringUtf8Coder.of().encode(value, out);
+    }
+    return out.toByteString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
new file mode 100644
index 0000000..d260207
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import com.google.protobuf.ByteString;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.fn.v1.BeamFnApi.StateAppendResponse;
+import org.apache.beam.fn.v1.BeamFnApi.StateClearResponse;
+import org.apache.beam.fn.v1.BeamFnApi.StateGetResponse;
+import org.apache.beam.fn.v1.BeamFnApi.StateKey;
+import org.apache.beam.fn.v1.BeamFnApi.StateKey.TypeCase;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
+import org.apache.beam.fn.v1.BeamFnApi.StateRequest.RequestCase;
+import org.apache.beam.fn.v1.BeamFnApi.StateResponse;
+
+/** A fake implementation of a {@link BeamFnStateClient} to aid with testing. */
+public class FakeBeamFnStateClient implements BeamFnStateClient {
+  private final Map<StateKey, ByteString> data;
+  private int currentId;
+
+  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
+    this.data = new ConcurrentHashMap<>(initialData);
+  }
+
+  public Map<StateKey, ByteString> getData() {
+    return Collections.unmodifiableMap(data);
+  }
+
+  @Override
+  public void handle(StateRequest.Builder requestBuilder,
+      CompletableFuture<StateResponse> responseFuture) {
+    // The id should never be filled out
+    assertEquals("", requestBuilder.getId());
+    requestBuilder.setId(generateId());
+
+    StateRequest request = requestBuilder.build();
+    StateKey key = request.getStateKey();
+    StateResponse.Builder response;
+
+    assertNotEquals(RequestCase.REQUEST_NOT_SET, request.getRequestCase());
+    assertNotEquals(TypeCase.TYPE_NOT_SET, key.getTypeCase());
+    // multimap side input and runner based state keys only support get requests
+    if (key.getTypeCase() == TypeCase.MULTIMAP_SIDE_INPUT
+        || key.getTypeCase() == TypeCase.RUNNER) {
+      assertEquals(RequestCase.GET, request.getRequestCase());
+    }
+
+    switch (request.getRequestCase()) {
+      case GET:
+        // Chunk gets into 5 byte return blocks
+        ByteString byteString = data.get(request.getStateKey());
+        int block = 0;
+        if (request.getGet().getContinuationToken().size() > 0) {
+          block = Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
+        }
+        ByteString returnBlock = byteString.substring(
+            block * 5, Math.min(byteString.size(), (block + 1) * 5));
+        ByteString continuationToken = ByteString.EMPTY;
+        if (byteString.size() > (block + 1) * 5) {
+          continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 1));
+        }
+        response = StateResponse.newBuilder().setGet(StateGetResponse.newBuilder()
+            .setData(returnBlock)
+            .setContinuationToken(continuationToken));
+        break;
+
+      case CLEAR:
+        data.remove(request.getStateKey());
+        response = StateResponse.newBuilder().setClear(StateClearResponse.getDefaultInstance());
+        break;
+
+      case APPEND:
+        ByteString previousValue = data.getOrDefault(request.getStateKey(), ByteString.EMPTY);
+        data.put(request.getStateKey(), previousValue.concat(request.getAppend().getData()));
+        response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
+        break;
+
+      default:
+        throw new IllegalStateException(
+            String.format("Unknown request type %s", request.getRequestCase()));
+    }
+
+    responseFuture.complete(response.setId(requestBuilder.getId()).build());
+  }
+
+  private String generateId() {
+    return Integer.toString(++currentId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8d36a261/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java
new file mode 100644
index 0000000..53eefb4
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/LazyCachingIteratorToIterableTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link LazyCachingIteratorToIterable}. */
+@RunWith(JUnit4.class)
+public class LazyCachingIteratorToIterableTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testEmptyIterator() {
+    Iterable<Object> iterable = new LazyCachingIteratorToIterable<>(Iterators.forArray());
+    assertArrayEquals(new Object[0], Iterables.toArray(iterable, Object.class));
+    // iterate multiple times
+    assertArrayEquals(new Object[0], Iterables.toArray(iterable, Object.class));
+
+    thrown.expect(NoSuchElementException.class);
+    iterable.iterator().next();
+  }
+
+  @Test
+  public void testInterleavedIteration() {
+    Iterable<String> iterable =
+        new LazyCachingIteratorToIterable<>(Iterators.forArray("A", "B", "C"));
+
+    Iterator<String> iterator1 = iterable.iterator();
+    assertTrue(iterator1.hasNext());
+    assertEquals("A", iterator1.next());
+    Iterator<String> iterator2 = iterable.iterator();
+    assertTrue(iterator2.hasNext());
+    assertEquals("A", iterator2.next());
+    assertTrue(iterator2.hasNext());
+    assertEquals("B", iterator2.next());
+    assertTrue(iterator1.hasNext());
+    assertEquals("B", iterator1.next());
+    assertTrue(iterator1.hasNext());
+    assertEquals("C", iterator1.next());
+    assertFalse(iterator1.hasNext());
+    assertTrue(iterator2.hasNext());
+    assertEquals("C", iterator2.next());
+    assertFalse(iterator2.hasNext());
+
+    thrown.expect(NoSuchElementException.class);
+    iterator1.next();
+  }
+}


[2/2] beam git commit: [BEAM-1347] Add a BagUserState implementation over the BeamFnStateClient

Posted by lc...@apache.org.
[BEAM-1347] Add a BagUserState implementation over the BeamFnStateClient

This closes #3760


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ba5c4071
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ba5c4071
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ba5c4071

Branch: refs/heads/master
Commit: ba5c4071dfa1831262bc01aa33f06ac0e5561484
Parents: f634aec 8d36a26
Author: Luke Cwik <lc...@google.com>
Authored: Mon Aug 28 09:54:38 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Aug 28 09:54:38 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/state/BagUserState.java     | 121 +++++++++++++++++++
 .../state/LazyCachingIteratorToIterable.java    |  72 +++++++++++
 .../beam/fn/harness/state/BagUserStateTest.java | 106 ++++++++++++++++
 .../fn/harness/state/FakeBeamFnStateClient.java | 110 +++++++++++++++++
 .../LazyCachingIteratorToIterableTest.java      |  76 ++++++++++++
 5 files changed, 485 insertions(+)
----------------------------------------------------------------------