You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by he...@apache.org on 2021/04/30 22:31:09 UTC

[beam] branch release-2.30.0 updated: [BEAM-12238] Make StateBackedIterable Serializable. (#14660)

This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.30.0 by this push:
     new b2df867  [BEAM-12238] Make StateBackedIterable Serializable. (#14660)
     new b460c0e3 Merge pull request #14683 from robertwb/release-2.30.0
b2df867 is described below

commit b2df867ff6bf298832f66483ba29e29d325fd342
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Apr 29 08:42:07 2021 -0700

    [BEAM-12238] Make StateBackedIterable Serializable. (#14660)
    
    As we're materializing the whole thing, there's no need to return an object of the same type. (These are only released in the public API as Iterables.)
---
 .../beam/fn/harness/state/StateBackedIterable.java | 11 +++++--
 .../fn/harness/state/StateBackedIterableTest.java  | 34 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
index 8df1cfa..6c6863a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
@@ -23,7 +23,9 @@ import com.google.auto.service.AutoService;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectStreamException;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -39,6 +41,7 @@ import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
@@ -56,9 +59,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
-public class StateBackedIterable<T> implements Iterable<T> {
+public class StateBackedIterable<T> implements Iterable<T>, Serializable {
 
-  private final BeamFnStateClient beamFnStateClient;
+  private final transient BeamFnStateClient beamFnStateClient;
   private final org.apache.beam.sdk.coders.Coder<T> elemCoder;
   @VisibleForTesting final StateRequest request;
   @VisibleForTesting final List<T> prefix;
@@ -92,6 +95,10 @@ public class StateBackedIterable<T> implements Iterable<T> {
                 StateFetchingIterators.readAllStartingFrom(beamFnStateClient, request))));
   }
 
+  protected Object writeReplace() throws ObjectStreamException {
+    return ImmutableList.copyOf(this);
+  }
+
   /**
    * Decodes an {@link Iterable} that might be backed by state. If the terminator at the end of the
    * value stream is {@code -1} then we return a {@link StateBackedIterable} otherwise we return an
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
index 1a9d902..d863e3b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -171,6 +173,38 @@ public class StateBackedIterableTest {
       assertEquals(iterable.prefix, result.prefix);
       assertEquals(iterable.request, result.request);
     }
+
+    @Test
+    public void testSerializability() throws Exception {
+      FakeBeamFnStateClient fakeBeamFnStateClient =
+          new FakeBeamFnStateClient(
+              ImmutableMap.of(
+                  key("suffix"), encode("C", "D", "E"),
+                  key("emptySuffix"), encode()));
+
+      StateBackedIterable<String> iterable =
+          new StateBackedIterable<>(
+              fakeBeamFnStateClient,
+              "instruction",
+              encode("suffix"),
+              StringUtf8Coder.of(),
+              ImmutableList.of("A", "B"));
+
+      List<String> expected = ImmutableList.of("A", "B", "C", "D", "E");
+
+      ByteArrayOutputStream bout = new ByteArrayOutputStream();
+      ObjectOutputStream out = new ObjectOutputStream(bout);
+      out.writeObject(iterable);
+      out.flush();
+      ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+      ObjectInputStream in = new ObjectInputStream(bin);
+      Iterable<String> deserialized = (Iterable<String>) in.readObject();
+
+      // Check that the contents are the same.
+      assertEquals(expected, Lists.newArrayList(deserialized));
+      // Check that we can still iterate over it as before.
+      assertEquals(expected, Lists.newArrayList(iterable));
+    }
   }
 
   private static StateKey key(String id) throws IOException {