You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by he...@apache.org on 2021/04/30 22:31:09 UTC
[beam] branch release-2.30.0 updated: [BEAM-12238] Make
StateBackedIterable Serializable. (#14660)
This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.30.0 by this push:
new b2df867 [BEAM-12238] Make StateBackedIterable Serializable. (#14660)
new b460c0e3 Merge pull request #14683 from robertwb/release-2.30.0
b2df867 is described below
commit b2df867ff6bf298832f66483ba29e29d325fd342
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Apr 29 08:42:07 2021 -0700
[BEAM-12238] Make StateBackedIterable Serializable. (#14660)
As we're materializing the whole thing, there's no need to return an object of the same type. (These are only released in the public API as Iterables.)
---
.../beam/fn/harness/state/StateBackedIterable.java | 11 +++++--
.../fn/harness/state/StateBackedIterableTest.java | 34 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 2 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
index 8df1cfa..6c6863a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
@@ -23,7 +23,9 @@ import com.google.auto.service.AutoService;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectStreamException;
import java.io.OutputStream;
+import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -39,6 +41,7 @@ import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
@@ -56,9 +59,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
@SuppressWarnings({
"rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
})
-public class StateBackedIterable<T> implements Iterable<T> {
+public class StateBackedIterable<T> implements Iterable<T>, Serializable {
- private final BeamFnStateClient beamFnStateClient;
+ private final transient BeamFnStateClient beamFnStateClient;
private final org.apache.beam.sdk.coders.Coder<T> elemCoder;
@VisibleForTesting final StateRequest request;
@VisibleForTesting final List<T> prefix;
@@ -92,6 +95,10 @@ public class StateBackedIterable<T> implements Iterable<T> {
StateFetchingIterators.readAllStartingFrom(beamFnStateClient, request))));
}
+ protected Object writeReplace() throws ObjectStreamException {
+ return ImmutableList.copyOf(this);
+ }
+
/**
* Decodes an {@link Iterable} that might be backed by state. If the terminator at the end of the
* value stream is {@code -1} then we return a {@link StateBackedIterable} otherwise we return an
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
index 1a9d902..d863e3b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -171,6 +173,38 @@ public class StateBackedIterableTest {
assertEquals(iterable.prefix, result.prefix);
assertEquals(iterable.request, result.request);
}
+
+ @Test
+ public void testSerializability() throws Exception {
+ FakeBeamFnStateClient fakeBeamFnStateClient =
+ new FakeBeamFnStateClient(
+ ImmutableMap.of(
+ key("suffix"), encode("C", "D", "E"),
+ key("emptySuffix"), encode()));
+
+ StateBackedIterable<String> iterable =
+ new StateBackedIterable<>(
+ fakeBeamFnStateClient,
+ "instruction",
+ encode("suffix"),
+ StringUtf8Coder.of(),
+ ImmutableList.of("A", "B"));
+
+ List<String> expected = ImmutableList.of("A", "B", "C", "D", "E");
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bout);
+ out.writeObject(iterable);
+ out.flush();
+ ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+ ObjectInputStream in = new ObjectInputStream(bin);
+ Iterable<String> deserialized = (Iterable<String>) in.readObject();
+
+ // Check that the contents are the same.
+ assertEquals(expected, Lists.newArrayList(deserialized));
+ // Check that we can still iterate over it as before.
+ assertEquals(expected, Lists.newArrayList(iterable));
+ }
}
private static StateKey key(String id) throws IOException {